Skip to content

Commit

Permalink
Use consistent hash to manage the topic (#681)
Browse files Browse the repository at this point in the history
  • Loading branch information
ThisIsClark authored Sep 4, 2021
1 parent a88486e commit be004ce
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 9 deletions.
30 changes: 30 additions & 0 deletions delfin/coordination.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
import inspect

import decorator
import tooz
from oslo_config import cfg
from oslo_log import log
from oslo_utils import uuidutils
import six
from tooz import coordination
from tooz import locking
from tooz import partitioner

from delfin import cryptor
from delfin import exception
Expand Down Expand Up @@ -320,3 +322,31 @@ def _get_redis_backend_url():
.format(backend_type=CONF.coordination.backend_type,
server=CONF.coordination.backend_server)
return backend_url


class ConsistentHashing(Coordinator):
GROUP_NAME = 'partitioner_group'

def __init__(self):
super(ConsistentHashing, self). \
__init__(agent_id=CONF.host, prefix="")

def join_group(self):
try:
self.coordinator.join_partitioned_group(self.GROUP_NAME)
except tooz.coordination.MemberAlreadyExist:
LOG.info('Member %s already in partitioner_group' % CONF.host)

def get_task_executor(self, task_id):
part = partitioner.Partitioner(self.coordinator, self.GROUP_NAME)
members = part.members_for_object(task_id)
for member in members:
LOG.info('For task id %s, host should be %s' % (task_id, member))
return member.decode('utf-8')

def register_watcher_func(self, on_node_join, on_node_leave):
self.coordinator.watch_join_group(self.GROUP_NAME, on_node_join)
self.coordinator.watch_leave_group(self.GROUP_NAME, on_node_leave)

def watch_group_change(self):
self.coordinator.run_watchers()
6 changes: 5 additions & 1 deletion delfin/leader_election/distributor/task_distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from delfin import db
from delfin.common.constants import TelemetryCollection
from delfin.coordination import ConsistentHashing
from delfin.task_manager import metrics_rpcapi as task_rpcapi

CONF = cfg.CONF
Expand Down Expand Up @@ -47,7 +48,10 @@ def __call__(self):
six.text_type(e))

def distribute_new_job(self, task_id):
executor = CONF.host
partitioner = ConsistentHashing()
partitioner.start()
executor = partitioner.get_task_executor(task_id)
partitioner.stop()
try:
db.task_update(self.ctx, task_id, {'executor': executor})
LOG.info('Distribute a new job, id: %s' % task_id)
Expand Down
4 changes: 4 additions & 0 deletions delfin/task_manager/metrics_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from oslo_log import log

from delfin import manager
from delfin.coordination import ConsistentHashing
from delfin.task_manager.scheduler import schedule_manager
from delfin.task_manager.scheduler.schedulers.telemetry.job_handler \
import FailedJobHandler
Expand All @@ -38,6 +39,9 @@ def __init__(self, service_name=None, *args, **kwargs):
scheduler = schedule_manager.SchedulerManager()
scheduler.start()
JobHandler.schedule_boot_jobs()
partitioner = ConsistentHashing()
partitioner.start()
partitioner.join_group()

def assign_job(self, context, task_id):
instance = JobHandler.get_instance(context, task_id)
Expand Down
56 changes: 53 additions & 3 deletions delfin/task_manager/scheduler/schedule_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
from oslo_utils import uuidutils

from delfin import context
from delfin import db
from delfin import service
from delfin import utils
from delfin.leader_election.distributor.failed_task_distributor\
from delfin.coordination import ConsistentHashing
from delfin.leader_election.distributor.failed_task_distributor \
import FailedTaskDistributor
from delfin.leader_election.distributor.task_distributor \
import TaskDistributor
from delfin.task_manager import metrics_rpcapi as task_rpcapi

LOG = log.getLogger(__name__)

Expand All @@ -38,6 +41,9 @@

@six.add_metaclass(utils.Singleton)
class SchedulerManager(object):

GROUP_CHANGE_DETECT_INTERVAL_SEC = 30

def __init__(self, scheduler=None):
if not scheduler:
scheduler = BackgroundScheduler()
Expand All @@ -47,6 +53,7 @@ def __init__(self, scheduler=None):
self.boot_jobs = dict()
self.boot_jobs_scheduled = False
self.ctx = context.get_admin_context()
self.task_rpcapi = task_rpcapi.TaskAPI()

def start(self):
""" Initialise the schedulers for periodic job creation
Expand All @@ -55,6 +62,40 @@ def start(self):
self.scheduler.start()
self.scheduler_started = True

def on_node_join(self, event):
# A new node joined the group, all the job would be re-distributed.
# If the job is already on the node, it would be ignore and would
# not be scheduled again
LOG.info('Member %s joined the group %s' % (event.member_id,
event.group_id))
# Get all the jobs
tasks = db.task_get_all(self.ctx)
distributor = TaskDistributor(self.ctx)
partitioner = ConsistentHashing()
partitioner.start()
for task in tasks:
# Get the specific executor
origin_executor = task['executor']
# If the target executor is different from current executor,
# remove the job from old executor and add it to new executor
new_executor = partitioner.get_task_executor(task['id'])
if new_executor != origin_executor:
LOG.info('Re-distribute job %s from %s to %s' %
(task['id'], origin_executor, new_executor))
self.task_rpcapi.remove_job(self.ctx, task['id'],
task['executor'])
distributor.distribute_new_job(task['id'])
partitioner.stop()

def on_node_leave(self, event):
LOG.info('Member %s left the group %s' % (event.member_id,
event.group_id))
filters = {'executor': event.member_id.decode('utf-8')}
re_distribute_tasks = db.task_get_all(self.ctx, filters=filters)
distributor = TaskDistributor(self.ctx)
for task in re_distribute_tasks:
distributor.distribute_new_job(task['id'])

def schedule_boot_jobs(self):
if not self.boot_jobs_scheduled:
try:
Expand Down Expand Up @@ -90,6 +131,13 @@ def schedule_boot_jobs(self):
'PerfJobManager',
coordination=True)
service.serve(job_generator)
partitioner = ConsistentHashing()
partitioner.start()
partitioner.register_watcher_func(self.on_node_join,
self.on_node_leave)
self.scheduler.add_job(partitioner.watch_group_change, 'interval',
seconds=self.GROUP_CHANGE_DETECT_INTERVAL_SEC,
next_run_time=datetime.now())

def stop(self):
"""Cleanup periodic jobs"""
Expand All @@ -104,5 +152,7 @@ def get_scheduler(self):
return self.scheduler

def recover_job(self):
# TODO: would be implement when implement the consistent hashing
pass
all_tasks = db.task_get_all(self.ctx)
distributor = TaskDistributor(self.ctx)
for task in all_tasks:
distributor.distribute_new_job(task['id'])
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,6 @@ def remove_job(self, task_id):
job = db.task_get(self.ctx, task_id)
job_id = job['job_id']
self.remove_scheduled_job(job_id)
db.task_delete(self.ctx, job['id'])
LOG.info("Removed job %s ", job['id'])
except Exception as e:
LOG.error("Failed to remove periodic scheduling job , reason: %s.",
six.text_type(e))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,21 @@

class TestTaskDistributor(test.TestCase):

@mock.patch('delfin.coordination.ConsistentHashing.stop')
@mock.patch('delfin.coordination.ConsistentHashing.get_task_executor')
@mock.patch('delfin.coordination.ConsistentHashing.start')
@mock.patch('delfin.task_manager.metrics_rpcapi.TaskAPI.assign_job')
@mock.patch.object(db, 'task_update')
@mock.patch(
'delfin.task_manager.metrics_rpcapi.TaskAPI.assign_job')
def test_distribute_new_job(self, mock_task_update, mock_assign_job):
@mock.patch('delfin.coordination.ConsistentHashing.__init__',
mock.Mock(return_value=None))
def test_distribute_new_job(self, mock_task_update, mock_assign_job,
mock_partitioner_start,
mock_get_task_executor, mock_partitioner_stop):
ctx = context.get_admin_context()
task_distributor = TaskDistributor(ctx)
task_distributor.distribute_new_job('fake_task_id')
self.assertEqual(mock_assign_job.call_count, 1)
self.assertEqual(mock_task_update.call_count, 1)
self.assertEqual(mock_partitioner_start.call_count, 1)
self.assertEqual(mock_get_task_executor.call_count, 1)
self.assertEqual(mock_partitioner_stop.call_count, 1)
75 changes: 75 additions & 0 deletions delfin/tests/unit/task_manager/scheduler/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,33 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from unittest import mock

from apscheduler.schedulers.background import BackgroundScheduler

from delfin import db
from delfin import test
from delfin.coordination import ConsistentHashing
from delfin.leader_election.distributor.task_distributor \
import TaskDistributor
from delfin.task_manager.metrics_rpcapi import TaskAPI
from delfin.task_manager.scheduler import schedule_manager

FAKE_TASKS = [
{
'id': 1,
'executor': 'node1'
},
{
'id': 2,
'executor': 'node2'
},
{
'id': 3,
'executor': 'node1'
}
]


class TestScheduler(test.TestCase):

Expand All @@ -28,3 +50,56 @@ def test_scheduler_manager_singleton(self):
self.assertIsInstance(second_instance, BackgroundScheduler)

self.assertEqual(first_instance, second_instance)

@mock.patch.object(BackgroundScheduler, 'start')
def test_start(self, mock_scheduler_start):
manager = schedule_manager.SchedulerManager()
manager.start()
self.assertEqual(mock_scheduler_start.call_count, 1)
manager.start()
self.assertEqual(mock_scheduler_start.call_count, 1)

@mock.patch('tooz.coordination.get_coordinator', mock.Mock())
@mock.patch.object(ConsistentHashing, 'get_task_executor')
@mock.patch.object(TaskAPI, 'remove_job')
@mock.patch.object(TaskDistributor, 'distribute_new_job')
@mock.patch.object(db, 'task_get_all')
def test_on_node_join(self, mock_task_get_all, mock_distribute_new_job,
mock_remove_job, mock_get_task_executor):
node1_job_count = 0
node2_job_count = 0
for job in FAKE_TASKS:
if job['executor'] == 'node1':
node1_job_count += 1
elif job['executor'] == 'node2':
node2_job_count += 1
mock_task_get_all.return_value = FAKE_TASKS
mock_get_task_executor.return_value = 'node1'
manager = schedule_manager.SchedulerManager()
manager.on_node_join(mock.Mock(member_id=b'fake_member_id',
group_id='node1'))
self.assertEqual(mock_task_get_all.call_count, 1)
self.assertEqual(mock_distribute_new_job.call_count,
node1_job_count + node2_job_count)
self.assertEqual(mock_remove_job.call_count, node2_job_count)
self.assertEqual(mock_get_task_executor.call_count,
node1_job_count + node2_job_count)

@mock.patch.object(TaskDistributor, 'distribute_new_job')
@mock.patch.object(db, 'task_get_all')
def test_on_node_leave(self, mock_task_get_all, mock_distribute_new_job):
mock_task_get_all.return_value = FAKE_TASKS
manager = schedule_manager.SchedulerManager()
manager.on_node_leave(mock.Mock(member_id=b'fake_member_id',
group_id='fake_group_id'))
self.assertEqual(mock_task_get_all.call_count, 1)
self.assertEqual(mock_distribute_new_job.call_count, len(FAKE_TASKS))

@mock.patch.object(TaskDistributor, 'distribute_new_job')
@mock.patch.object(db, 'task_get_all')
def test_recover_job(self, mock_task_get_all, mock_distribute_new_job):
mock_task_get_all.return_value = FAKE_TASKS
manager = schedule_manager.SchedulerManager()
manager.recover_job()
self.assertEqual(mock_task_get_all.call_count, 1)
self.assertEqual(mock_distribute_new_job.call_count, len(FAKE_TASKS))
30 changes: 30 additions & 0 deletions delfin/tests/unit/test_coordination.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,33 @@ def func(foo, bar):
bar.__getitem__.return_value = 8
func(foo, bar)
get_lock.assert_called_with('lock-func-7-8')


class ConsistentHashingTestCase(test.TestCase):

def setUp(self):
super(ConsistentHashingTestCase, self).setUp()
self.get_coordinator = self.mock_object(tooz_coordination,
'get_coordinator')

def test_join_group(self):
crd = self.get_coordinator.return_value
part = coordination.ConsistentHashing()
part.start()
part.join_group()
self.assertTrue(crd.join_partitioned_group.called)

def test_register_watcher_func(self):
crd = self.get_coordinator.return_value
part = coordination.ConsistentHashing()
part.start()
part.register_watcher_func(mock.Mock(), mock.Mock())
self.assertTrue(crd.watch_join_group.called)
self.assertTrue(crd.watch_leave_group.called)

def test_watch_group_change(self):
crd = self.get_coordinator.return_value
part = coordination.ConsistentHashing()
part.start()
part.watch_group_change()
self.assertTrue(crd.run_watchers.called)
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ APScheduler~=3.6.3
flask
kafka-python
importlib-metadata==3.7.0; python_version < "3.8"
tenacity==6.3.1

0 comments on commit be004ce

Please sign in to comment.