diff --git a/delfin/common/config.py b/delfin/common/config.py index 5ef18d371..6c8bad8e8 100644 --- a/delfin/common/config.py +++ b/delfin/common/config.py @@ -119,6 +119,24 @@ .DEF_PERFORMANCE_HISTORY_ON_RESCHEDULE, help='default history(in sec) to be collected on a job ' 'reschedule'), + cfg.BoolOpt('enable_dynamic_subprocess', + default=False, + help='Enable dynamic subprocess metrics collection'), + cfg.IntOpt('process_cleanup_interval', + default=60, + help='Background process cleanup call interval in sec'), + cfg.IntOpt('task_cleanup_delay', + default=10, + help='Delay for task cleanup before killing child in sec'), + cfg.IntOpt('group_change_detect_interval', + default=30, + help='Local executor group change detect interval in sec'), + cfg.IntOpt('max_storages_in_child', + default=5, + help='Max storages handled by one local executor process'), + cfg.IntOpt('max_childs_in_node', + default=100000, + help='Max processes that can be spawned before forcing fail'), ] CONF.register_opts(telemetry_opts, "telemetry") diff --git a/delfin/coordination.py b/delfin/coordination.py index b45ba834f..b108294a1 100644 --- a/delfin/coordination.py +++ b/delfin/coordination.py @@ -15,7 +15,7 @@ import inspect import decorator -import tooz + from oslo_config import cfg from oslo_log import log from oslo_utils import uuidutils @@ -334,7 +334,7 @@ def __init__(self): def join_group(self): try: self.coordinator.join_partitioned_group(self.GROUP_NAME) - except tooz.coordination.MemberAlreadyExist: + except coordination.MemberAlreadyExist: LOG.info('Member %s already in partitioner_group' % CONF.host) def get_task_executor(self, task_id): @@ -350,3 +350,53 @@ def register_watcher_func(self, on_node_join, on_node_leave): def watch_group_change(self): self.coordinator.run_watchers() + + +class GroupMembership(Coordinator): + + def __init__(self, agent_id): + super(GroupMembership, self). \ + __init__(agent_id=agent_id, prefix="") + + def create_group(self, group): + try: + self.coordinator.create_group(group.encode()).get() + except coordination.GroupAlreadyExist: + LOG.info("Group {0} already exist".format(group)) + + def delete_group(self, group): + try: + self.coordinator.delete_group(group.encode()).get() + except coordination.GroupNotCreated: + LOG.info("Group {0} not created".format(group)) + except coordination.GroupNotEmpty: + LOG.info("Group {0} not empty".format(group)) + except coordination.ToozError: + LOG.info("Group {0} internal error while delete".format(group)) + + def join_group(self, group): + try: + self.coordinator.join_group(group.encode()).get() + except coordination.MemberAlreadyExist: + LOG.info('Member %s already in group' % group) + + def leave_group(self, group): + try: + self.coordinator.leave_group(group.encode()).get() + except coordination.GroupNotCreated: + LOG.info('Group %s not created' % group) + + def get_members(self, group): + try: + return self.coordinator.get_members(group.encode()).get() + except coordination.GroupNotCreated: + LOG.info('Group %s not created' % group) + + return None + + def register_watcher_func(self, group, on_process_join, on_process_leave): + self.coordinator.watch_join_group(group.encode(), on_process_join) + self.coordinator.watch_leave_group(group.encode(), on_process_leave) + + def watch_group_change(self): + self.coordinator.run_watchers() diff --git a/delfin/service.py b/delfin/service.py index 0e993c803..499a747c6 100644 --- a/delfin/service.py +++ b/delfin/service.py @@ -278,6 +278,29 @@ def start(self): super(TaskService, self).start() +class MetricsService(Service): + """Service object for triggering metrics manager functionalities. + """ + + @classmethod + def create(cls, host=None, binary=None, topic=None, + manager=None, periodic_interval=None, + periodic_fuzzy_delay=None, service_name=None, + coordination=False, *args, **kwargs): + service_obj = super(MetricsService, cls).create( + host=host, binary=binary, topic=topic, manager=manager, + periodic_interval=periodic_interval, + periodic_fuzzy_delay=periodic_fuzzy_delay, + service_name=service_name, + coordination=coordination, *args, **kwargs) + + return service_obj + + def start(self): + super(MetricsService, self).start() + self.manager.init_scheduler(self.topic, self.host) + + class LeaderElectionService(service.Service): """Leader election service for distributed system diff --git a/delfin/task_manager/metrics_manager.py b/delfin/task_manager/metrics_manager.py index 5ef306db4..dbd373990 100644 --- a/delfin/task_manager/metrics_manager.py +++ b/delfin/task_manager/metrics_manager.py @@ -14,18 +14,30 @@ """ periodical task manager for metric collection tasks** """ +from apscheduler.schedulers.background import BackgroundScheduler +import datetime +import six + from oslo_log import log +from oslo_config import cfg +from oslo_utils import uuidutils +from oslo_service import service as oslo_ser +from delfin import context as ctxt +from delfin.coordination import ConsistentHashing, GroupMembership +from delfin import db +from delfin import exception from delfin import manager -from delfin.coordination import ConsistentHashing +from delfin import service from delfin.task_manager.scheduler import schedule_manager +from delfin.task_manager import subprocess_rpcapi as rpcapi from delfin.task_manager.scheduler.schedulers.telemetry.job_handler \ import FailedJobHandler from delfin.task_manager.scheduler.schedulers.telemetry.job_handler \ import JobHandler -from delfin.task_manager.tasks import telemetry LOG = log.getLogger(__name__) +CONF = cfg.CONF class MetricsTaskManager(manager.Manager): @@ -34,27 +46,288 @@ class MetricsTaskManager(manager.Manager): RPC_API_VERSION = '1.0' def __init__(self, service_name=None, *args, **kwargs): - self.telemetry_task = telemetry.TelemetryTask() super(MetricsTaskManager, self).__init__(*args, **kwargs) scheduler = schedule_manager.SchedulerManager() scheduler.start() - JobHandler.schedule_boot_jobs() partitioner = ConsistentHashing() partitioner.start() partitioner.join_group() + self.watch_job_id = None + self.cleanup_job_id = None + self.group = None + self.watcher = None + self.scheduler = None + self.rpcapi = rpcapi.SubprocessAPI() + self.executor_map = {} + self.enable_sub_process = CONF.telemetry.enable_dynamic_subprocess + if self.enable_sub_process: + self.scheduler = BackgroundScheduler() + self.scheduler.start() + self.schedule_boot_jobs(self.host) + + def assign_job(self, context, task_id, executor): + if not self.enable_sub_process: + instance = JobHandler.get_instance(context, task_id) + instance.schedule_job(task_id) + else: + if not self.watch_job_id: + self.init_watchers(executor) + local_executor = self.get_local_executor( + context, task_id, None, executor) + self.rpcapi.assign_job_local(context, task_id, local_executor) + + def remove_job(self, context, task_id, executor): + if not self.enable_sub_process: + instance = JobHandler.get_instance(context, task_id) + instance.remove_job(task_id) + else: + job = db.task_get(context, task_id) + storage_id = job['storage_id'] + for name in self.executor_map.keys(): + if storage_id in self.executor_map[name]["storages"]: + local_executor = "{0}:{1}".format(executor, name) + self.rpcapi.remove_job_local( + context, task_id, local_executor) + tasks, failed_tasks = self.get_all_tasks(storage_id) + if len(failed_tasks) == 0 and len(tasks) == 0: + self.stop_executor(name, local_executor, storage_id) + + def assign_failed_job(self, context, failed_task_id, executor): + if not self.enable_sub_process: + instance = FailedJobHandler.get_instance(context, failed_task_id) + instance.schedule_failed_job(failed_task_id) + else: + if not self.watch_job_id: + self.init_watchers(executor) + + local_executor = self.get_local_executor( + context, None, failed_task_id, executor) + self.rpcapi.assign_failed_job_local( + context, failed_task_id, local_executor) + + def remove_failed_job(self, context, failed_task_id, executor): + if not self.enable_sub_process: + instance = FailedJobHandler.get_instance(context, failed_task_id) + instance.remove_failed_job(failed_task_id) + else: + job = db.failed_task_get(context, failed_task_id) + storage_id = job['storage_id'] + for name in self.executor_map.keys(): + if storage_id in self.executor_map[name]["storages"]: + local_executor = "{0}:{1}".format(executor, name) + self.rpcapi.remove_failed_job_local( + context, failed_task_id, local_executor) + tasks, failed_tasks = self.get_all_tasks(storage_id) + if len(failed_tasks) == 0 and len(tasks) == 0: + self.stop_executor(name, local_executor, storage_id) + + def schedule_boot_jobs(self, executor): + """Schedule periodic collection if any task is currently assigned to + this executor """ + try: + filters = {'executor': executor, + 'deleted': False} + context = ctxt.get_admin_context() + tasks = db.task_get_all(context, filters=filters) + failed_tasks = db.failed_task_get_all(context, filters=filters) + LOG.info("Scheduling boot time jobs for this executor: total " + "jobs to be handled :%s" % len(tasks)) + for task in tasks: + self.assign_job(context, task['id'], executor) + LOG.debug('Periodic collection job assigned for id: ' + '%s ' % task['id']) + for failed_task in failed_tasks: + self.assign_failed_job(context, failed_task['id'], executor) + LOG.debug('Failed job assigned for id: ' + '%s ' % failed_task['id']) + + except Exception as e: + LOG.error("Failed to schedule boot jobs for this executor " + "reason: %s.", + six.text_type(e)) + else: + LOG.debug("Boot job scheduling completed.") + + def init_watchers(self, group): + watcher = GroupMembership(agent_id=group) + watcher.start() + watcher.create_group(group) + LOG.info('Created child process membership group {0}.' + 'Initial members of group: {1}' + .format(group, watcher.get_members(group))) + + watcher.register_watcher_func(group, + self.on_process_join, + self.on_process_leave) + self.group = group + self.watcher = watcher + self.watch_job_id = uuidutils.generate_uuid() + self.scheduler.add_job(watcher.watch_group_change, 'interval', + seconds=CONF.telemetry. + group_change_detect_interval, + next_run_time=datetime.datetime.now(), + id=self.watch_job_id) + LOG.info('Created watch for group membership change for group {0}.' + .format(group)) + self.cleanup_job_id = uuidutils.generate_uuid() + self.scheduler.add_job(self.process_cleanup, 'interval', + seconds=CONF.telemetry.process_cleanup_interval, + next_run_time=datetime.datetime.now(), + id=self.cleanup_job_id) + LOG.info('Created process cleanup background job for group {0}.' + .format(group)) + + def on_process_join(self, event): + LOG.info('Member %s joined the group %s' % (event.member_id, + event.group_id)) + host = event.group_id.decode('utf-8') + if self.watcher: + LOG.info('Processes in current node {0}' + .format(self.watcher.get_members(host))) + + def on_process_leave(self, event): + LOG.info('Member %s left the group %s' % (event.member_id, + event.group_id)) + executor_topic = event.member_id.decode('utf-8') + name = executor_topic.split(':')[1] + if name in self.executor_map.keys(): + host = event.group_id.decode('utf-8') + LOG.info("Re-create process {0} in {1} that is handling tasks" + .format(executor_topic, host)) + launcher = self.create_process(executor_topic, host) + self.executor_map[name]["launcher"] = launcher + context = ctxt.get_admin_context() + for storage_id in self.executor_map[name]["storages"]: + tasks, failed_tasks = self.get_all_tasks(storage_id) + for task in tasks: + LOG.info("Re-scheduling task {0} of storage {1}" + .format(task['id'], storage_id)) + self.rpcapi.assign_job_local( + context, task['id'], executor_topic) + + for f_task in failed_tasks: + LOG.info("Re-scheduling failed failed task {0}," + " of storage {1}" + .format(f_task['id'], storage_id)) + self.rpcapi.assign_failed_job_local( + context, f_task['id'], executor_topic) + + def process_cleanup(self): + LOG.info('Periodic process cleanup called') + executor_names = self.executor_map.keys() + + # Collect all names to delete + names_to_delete = [] + for name in executor_names: + if len(self.executor_map[name]["storages"]) == 0: + delay = self.executor_map[name]["cleanup_delay"] + if delay < 0: + LOG.info("Cleanup delay for local executor {0} expired" + .format(name)) + names_to_delete.append(name) + else: + LOG.info("Delay cleanup for local executor {0} for {1}" + .format(name, delay)) + delay = delay - CONF.telemetry.process_cleanup_interval + self.executor_map[name]["cleanup_delay"] = delay + # Delete names + for name in names_to_delete: + self.executor_map[name]["launcher"].stop() + self.executor_map.pop(name) + + def create_process(self, topic=None, host=None): + metrics_task_server = service. \ + MetricsService.create(binary='delfin-task', + topic=topic, + host=host, + manager='delfin.' + 'task_manager.' + 'subprocess_manager.' + 'SubprocessManager', + coordination=False) + launcher = oslo_ser.ProcessLauncher(CONF) + launcher.launch_service(metrics_task_server, workers=1) + return launcher + + def get_local_executor(self, context, task_id, failed_task_id, executor): + executor_names = self.executor_map.keys() + storage_id = None + if task_id: + job = db.task_get(context, task_id) + storage_id = job['storage_id'] + elif failed_task_id: + job = db.failed_task_get(context, failed_task_id) + storage_id = job['storage_id'] + else: + raise exception.InvalidInput("Missing task id") + + # Storage already exists + for name in executor_names: + executor_topic = "{0}:{1}".format(executor, name) + if storage_id in self.executor_map[name]["storages"]: + return executor_topic + + # Return existing executor_topic + for name in executor_names: + no_of_storages = len(self.executor_map[name]["storages"]) + if no_of_storages and (no_of_storages < + CONF.telemetry.max_storages_in_child): + executor_topic = "{0}:{1}".format(executor, name) + LOG.info("Selecting existing local executor {0} for {1}" + .format(executor_topic, storage_id)) + self.executor_map[name]["storages"].append(storage_id) + return executor_topic + + # Return executor_topic after creating one + for index in range(CONF.telemetry.max_childs_in_node): + name = "executor_{0}".format(index + 1) + if name not in executor_names: + executor_topic = "{0}:{1}".format(executor, name) + LOG.info("Create a new local executor {0} for {1}" + .format(executor_topic, storage_id)) + launcher = self.create_process( + topic=executor_topic, host=executor) + self.executor_map[name] = { + "storages": [storage_id], + "launcher": launcher, + "cleanup_delay": 0 + } + return executor_topic - def assign_job(self, context, task_id): - instance = JobHandler.get_instance(context, task_id) - instance.schedule_job(task_id) + msg = "Reached maximum number of ({0}) local executors". \ + format(CONF.telemetry.max_childs_in_node) + LOG.error(msg) + raise RuntimeError(msg) - def remove_job(self, context, task_id): - instance = JobHandler.get_instance(context, task_id) - instance.remove_job(task_id) + def get_all_tasks(self, storage_id): + filters = {'storage_id': storage_id, + 'deleted': False} + context = ctxt.get_admin_context() + tasks = db.task_get_all(context, filters=filters) + failed_tasks = db.failed_task_get_all(context, filters=filters) + return tasks, failed_tasks - def assign_failed_job(self, context, failed_task_id): - instance = FailedJobHandler.get_instance(context, failed_task_id) - instance.schedule_failed_job(failed_task_id) + def stop_executor(self, name, local_executor, storage_id): + LOG.info("Stop and remove local executor {0}" + .format(local_executor)) + if storage_id in self.executor_map[name]["storages"]: + self.executor_map[name]["storages"].remove(storage_id) + self.executor_map[name]["cleanup_delay"] = \ + CONF.telemetry.task_cleanup_delay - def remove_failed_job(self, context, failed_task_id): - instance = FailedJobHandler.get_instance(context, failed_task_id) - instance.remove_failed_job(failed_task_id) + def stop(self): + """Cleanup periodic jobs""" + if self.watch_job_id: + self.scheduler.remove_job(self.watch_job_id) + if self.cleanup_job_id: + self.scheduler.remove_job(self.cleanup_job_id) + if self.group and self.watcher: + self.watcher.delete_group(self.group) + if self.watcher: + self.watcher.stop() + if self.scheduler: + self.scheduler.shutdown() + self.watch_job_id = None + self.cleanup_job_id = None + self.group = None + self.watcher = None diff --git a/delfin/task_manager/metrics_rpcapi.py b/delfin/task_manager/metrics_rpcapi.py index ea2b96b02..220b1f09e 100644 --- a/delfin/task_manager/metrics_rpcapi.py +++ b/delfin/task_manager/metrics_rpcapi.py @@ -51,28 +51,30 @@ def assign_job(self, context, task_id, executor): call_context = rpc_client.prepare(topic=str(executor), version='1.0', fanout=True) return call_context.cast(context, 'assign_job', - task_id=task_id) + task_id=task_id, executor=executor) def remove_job(self, context, task_id, executor): rpc_client = self.get_client(str(executor)) call_context = rpc_client.prepare(topic=str(executor), version='1.0', fanout=True) return call_context.cast(context, 'remove_job', - task_id=task_id) + task_id=task_id, executor=executor) def assign_failed_job(self, context, failed_task_id, executor): rpc_client = self.get_client(str(executor)) call_context = rpc_client.prepare(topic=str(executor), version='1.0', fanout=True) return call_context.cast(context, 'assign_failed_job', - failed_task_id=failed_task_id) + failed_task_id=failed_task_id, + executor=executor) def remove_failed_job(self, context, failed_task_id, executor): rpc_client = self.get_client(str(executor)) call_context = rpc_client.prepare(topic=str(executor), version='1.0', fanout=True) return call_context.cast(context, 'remove_failed_job', - failed_task_id=failed_task_id) + failed_task_id=failed_task_id, + executor=executor) def create_perf_job(self, context, task_id): rpc_client = self.get_client('JobGenerator') diff --git a/delfin/task_manager/scheduler/schedule_manager.py b/delfin/task_manager/scheduler/schedule_manager.py index 34092b211..225fc2559 100644 --- a/delfin/task_manager/scheduler/schedule_manager.py +++ b/delfin/task_manager/scheduler/schedule_manager.py @@ -87,9 +87,10 @@ def on_node_join(self, event): if new_executor != origin_executor: LOG.info('Re-distribute failed_job %s from %s to %s' % (failed_task['id'], origin_executor, new_executor)) - self.task_rpcapi.remove_job(self.ctx, task['id'], - task['executor']) - distributor.distribute_failed_job(failed_task['id']) + self.task_rpcapi.remove_failed_job( + self.ctx, failed_task['id'], failed_task['executor']) + distributor.distribute_failed_job(failed_task['id'], + task['executor']) partitioner.stop() def on_node_leave(self, event): diff --git a/delfin/task_manager/scheduler/schedulers/telemetry/job_handler.py b/delfin/task_manager/scheduler/schedulers/telemetry/job_handler.py index d5aa5b5d4..e94ac3f29 100644 --- a/delfin/task_manager/scheduler/schedulers/telemetry/job_handler.py +++ b/delfin/task_manager/scheduler/schedulers/telemetry/job_handler.py @@ -19,7 +19,7 @@ from oslo_log import log from oslo_utils import uuidutils, importutils -from delfin import db, context +from delfin import db from delfin.common.constants import TelemetryCollection, TelemetryJobStatus from delfin.exception import TaskNotFound from delfin.task_manager import rpcapi as task_rpcapi @@ -49,38 +49,6 @@ def get_instance(ctx, task_id): return JobHandler(ctx, task_id, task['storage_id'], task['args'], task['interval']) - @staticmethod - def schedule_boot_jobs(): - """Schedule periodic collection if any task is currently assigned to - this executor """ - try: - - filters = {'executor': CONF.host, - 'deleted': False} - ctxt = context.get_admin_context() - tasks = db.task_get_all(ctxt, filters=filters) - failed_tasks = db.failed_task_get_all(ctxt, filters=filters) - LOG.info("Scheduling boot time jobs for this executor: total " - "jobs to be handled :%s" % len(tasks)) - for task in tasks: - instance = JobHandler.get_instance(ctxt, task['id']) - instance.schedule_job(task['id']) - LOG.debug('Periodic collection job assigned for id: ' - '%s ' % task['id']) - for failed_task in failed_tasks: - instance = FailedJobHandler.get_instance(ctxt, - failed_task['id']) - instance.schedule_failed_job(failed_task['id']) - LOG.debug('Failed job assigned for id: ' - '%s ' % failed_task['id']) - - except Exception as e: - LOG.error("Failed to schedule boot jobs for this executor " - "reason: %s.", - six.text_type(e)) - else: - LOG.debug("Boot job scheduling completed.") - def schedule_job(self, task_id): if self.stopped: diff --git a/delfin/task_manager/subprocess_manager.py b/delfin/task_manager/subprocess_manager.py new file mode 100644 index 000000000..c8709cbac --- /dev/null +++ b/delfin/task_manager/subprocess_manager.py @@ -0,0 +1,63 @@ +# Copyright 2021 The SODA Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Subprocess metrics manager for metric collection tasks** +""" + +from oslo_log import log +from oslo_config import cfg + +from delfin.coordination import GroupMembership +from delfin import manager +from delfin.task_manager.scheduler import schedule_manager +from delfin.task_manager.scheduler.schedulers.telemetry.job_handler \ + import FailedJobHandler +from delfin.task_manager.scheduler.schedulers.telemetry.job_handler \ + import JobHandler + + +CONF = cfg.CONF +LOG = log.getLogger(__name__) + + +class SubprocessManager(manager.Manager): + """manage periodical collection tasks in subprocesses""" + + RPC_API_VERSION = '1.0' + + def __init__(self, service_name=None, *args, **kwargs): + super(SubprocessManager, self).__init__(*args, **kwargs) + + def init_scheduler(self, topic, host): + scheduler = schedule_manager.SchedulerManager() + scheduler.start() + watcher = GroupMembership(topic) + watcher.start() + watcher.join_group(host) + + def assign_job_local(self, context, task_id): + instance = JobHandler.get_instance(context, task_id) + instance.schedule_job(task_id) + + def remove_job_local(self, context, task_id): + instance = JobHandler.get_instance(context, task_id) + instance.remove_job(task_id) + + def assign_failed_job_local(self, context, failed_task_id): + instance = FailedJobHandler.get_instance(context, failed_task_id) + instance.schedule_failed_job(failed_task_id) + + def remove_failed_job_local(self, context, failed_task_id): + instance = FailedJobHandler.get_instance(context, failed_task_id) + instance.remove_failed_job(failed_task_id) diff --git a/delfin/task_manager/subprocess_rpcapi.py b/delfin/task_manager/subprocess_rpcapi.py new file mode 100644 index 000000000..62e19b4d2 --- /dev/null +++ b/delfin/task_manager/subprocess_rpcapi.py @@ -0,0 +1,75 @@ +# Copyright 2021 The SODA Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Client side of the subprocess metrics collection manager RPC API. +""" + +import oslo_messaging as messaging +from oslo_config import cfg + +from delfin import rpc + +CONF = cfg.CONF + + +class SubprocessAPI(object): + """Client side of the subprocess metrics manager collection rpc API. + + API version history: + + 1.0 - Initial version. + """ + + RPC_API_VERSION = '1.0' + + def __init__(self): + super(SubprocessAPI, self).__init__() + self.target = messaging.Target(topic=CONF.host, + version=self.RPC_API_VERSION) + self.client = rpc.get_client(self.target, + version_cap=self.RPC_API_VERSION) + + def get_client(self, topic): + target = messaging.Target(topic=topic, + version=self.RPC_API_VERSION) + return rpc.get_client(target, version_cap=self.RPC_API_VERSION) + + def assign_job_local(self, context, task_id, executor): + rpc_client = self.get_client(str(executor)) + call_context = rpc_client.prepare(topic=str(executor), version='1.0', + fanout=False) + return call_context.cast(context, 'assign_job_local', + task_id=task_id) + + def remove_job_local(self, context, task_id, executor): + rpc_client = self.get_client(str(executor)) + call_context = rpc_client.prepare(topic=str(executor), version='1.0', + fanout=False) + return call_context.cast(context, 'remove_job_local', + task_id=task_id) + + def assign_failed_job_local(self, context, failed_task_id, executor): + rpc_client = self.get_client(str(executor)) + call_context = rpc_client.prepare(topic=str(executor), version='1.0', + fanout=False) + return call_context.cast(context, 'assign_failed_job_local', + failed_task_id=failed_task_id) + + def remove_failed_job_local(self, context, failed_task_id, executor): + rpc_client = self.get_client(str(executor)) + call_context = rpc_client.prepare(topic=str(executor), version='1.0', + fanout=False) + return call_context.cast(context, 'remove_failed_job_local', + failed_task_id=failed_task_id) diff --git a/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_job_handler.py b/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_job_handler.py index e3ea65123..9e24fcb2b 100644 --- a/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_job_handler.py +++ b/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_job_handler.py @@ -139,18 +139,6 @@ def test_telemetry_removal_success(self, mock_log_error): telemetry_job.remove_job(fake_telemetry_job['id']) self.assertEqual(mock_log_error.call_count, 0) - @mock.patch.object(db, 'task_get_all', - mock.Mock(return_value=fake_telemetry_jobs)) - @mock.patch.object(db, 'task_update', - mock.Mock(return_value=fake_telemetry_job)) - @mock.patch.object(db, 'task_get', - mock.Mock(return_value=fake_telemetry_job)) - @mock.patch( - 'apscheduler.schedulers.background.BackgroundScheduler.add_job') - def test_schedule_boot_jobs(self, mock_add_job): - JobHandler.schedule_boot_jobs() - self.assertEqual(mock_add_job.call_count, 1) - class TestFailedTelemetryJob(test.TestCase): diff --git a/delfin/tests/unit/task_manager/test_telemetry.py b/delfin/tests/unit/task_manager/test_telemetry.py index f392f422f..a5565bacb 100644 --- a/delfin/tests/unit/task_manager/test_telemetry.py +++ b/delfin/tests/unit/task_manager/test_telemetry.py @@ -18,6 +18,12 @@ from delfin import exception from delfin import test from delfin.task_manager.tasks import telemetry +from delfin.task_manager.metrics_manager import MetricsTaskManager +from delfin.task_manager.scheduler.schedulers.telemetry.job_handler \ + import JobHandler, FailedJobHandler +from apscheduler.schedulers.background import BackgroundScheduler +from delfin.task_manager.subprocess_rpcapi import SubprocessAPI + fake_storage = { 'id': '12c2d52f-01bc-41f5-b73f-7abf6f38a2a6', @@ -70,3 +76,142 @@ def test_performance_collection_failure(self, mock_collect_perf_metrics, # when collect metric fails self.assertEqual(mock_dispatch.call_count, 0) self.assertEqual(mock_log_error.call_count, 1) + + @mock.patch.object(SubprocessAPI, 'assign_job_local') + @mock.patch.object(db, 'task_get') + @mock.patch.object(JobHandler, 'schedule_job') + @mock.patch.object(MetricsTaskManager, 'schedule_boot_jobs') + @mock.patch.object(MetricsTaskManager, 'create_process') + def test_metric_manager_assign_job(self, mock_create, mock_boot_job, + mock_job_schedule, mock_db, + mock_subprocess_api): + mock_db.return_value = { + 'storage_id': 'storage_id1', + 'args': 'args', + 'interval': 10, + } + mock_create.return_value = None + mock_boot_job.return_value = None + mock_job_schedule.return_value = None + mock_subprocess_api.return_value = None + + mgr = MetricsTaskManager() + mgr.enable_sub_process = False + + mgr.assign_job('context', 'task_id1', 'host1') + self.assertEqual(mock_job_schedule.call_count, 1) + + mgr.enable_sub_process = True + mgr.scheduler = BackgroundScheduler() + mgr.scheduler.start() + mgr.assign_job('context', 'task_id1', 'host1') + self.assertEqual(mock_job_schedule.call_count, 1) + self.assertEqual(mock_subprocess_api.call_count, 1) + + @mock.patch.object(SubprocessAPI, 'remove_job_local') + @mock.patch.object(db, 'task_get') + @mock.patch.object(JobHandler, 'remove_job') + @mock.patch.object(MetricsTaskManager, 'schedule_boot_jobs') + @mock.patch.object(MetricsTaskManager, 'create_process') + def test_metric_manager_remove_job(self, mock_create, mock_boot_job, + mock_job_schedule, mock_db, + mock_subprocess_api): + mock_db.return_value = { + 'storage_id': 'storage_id1', + 'args': 'args', + 'interval': 10, + } + mock_create.return_value = None + mock_boot_job.return_value = None + mock_job_schedule.return_value = None + mock_subprocess_api.return_value = None + + mgr = MetricsTaskManager() + mgr.enable_sub_process = False + + mgr.remove_job('context', 'task_id1', 'host1') + self.assertEqual(mock_job_schedule.call_count, 1) + + mgr.enable_sub_process = True + mgr.executor_map = { + 'host1': { + "storages": ['storage_id1'], + } + } + mgr.scheduler = BackgroundScheduler() + mgr.scheduler.start() + mgr.remove_job('context', 'task_id1', 'host1') + self.assertEqual(mock_job_schedule.call_count, 1) + self.assertEqual(mock_subprocess_api.call_count, 1) + + @mock.patch.object(SubprocessAPI, 'assign_failed_job_local') + @mock.patch.object(db, 'failed_task_get') + @mock.patch.object(FailedJobHandler, 'schedule_failed_job') + @mock.patch.object(MetricsTaskManager, 'schedule_boot_jobs') + @mock.patch.object(MetricsTaskManager, 'create_process') + @mock.patch.object(MetricsTaskManager, 'get_local_executor') + def test_metric_manager_assign_failed_job(self, mock_executor, + mock_create, + mock_boot_job, + mock_job_schedule, mock_db, + mock_subprocess_api): + mock_db.return_value = { + 'storage_id': 'storage_id1', + 'args': 'args', + 'interval': 10, + } + mock_create.return_value = None + mock_boot_job.return_value = None + mock_job_schedule.return_value = None + mock_subprocess_api.return_value = None + mock_executor.return_value = None + + mgr = MetricsTaskManager() + mgr.enable_sub_process = False + + mgr.assign_failed_job('context', 'task_id1', 'host1') + self.assertEqual(mock_job_schedule.call_count, 1) + + mgr.enable_sub_process = True + mgr.scheduler = BackgroundScheduler() + mgr.scheduler.start() + mgr.assign_failed_job('context', 'task_id1', 'host1') + self.assertEqual(mock_job_schedule.call_count, 1) + self.assertEqual(mock_subprocess_api.call_count, 1) + + @mock.patch.object(SubprocessAPI, 'remove_failed_job_local') + @mock.patch.object(db, 'failed_task_get') + @mock.patch.object(FailedJobHandler, 'remove_failed_job') + @mock.patch.object(MetricsTaskManager, 'schedule_boot_jobs') + @mock.patch.object(MetricsTaskManager, 'create_process') + def test_metric_manager_remove_failed_job(self, mock_create, + mock_boot_job, + mock_job_schedule, mock_db, + mock_subprocess_api): + mock_db.return_value = { + 'storage_id': 'storage_id1', + 'args': 'args', + 'interval': 10, + } + mock_create.return_value = None + mock_boot_job.return_value = None + mock_job_schedule.return_value = None + mock_subprocess_api.return_value = None + + mgr = MetricsTaskManager() + mgr.enable_sub_process = False + + mgr.remove_failed_job('context', 'task_id1', 'host1') + self.assertEqual(mock_job_schedule.call_count, 1) + + mgr.enable_sub_process = True + mgr.executor_map = { + 'host1': { + "storages": ['storage_id1'], + } + } + mgr.scheduler = BackgroundScheduler() + mgr.scheduler.start() + mgr.remove_failed_job('context', 'task_id1', 'host1') + self.assertEqual(mock_job_schedule.call_count, 1) + self.assertEqual(mock_subprocess_api.call_count, 1)