diff --git a/delfin/api/v1/storages.py b/delfin/api/v1/storages.py index 24acbdf4b..61fd9fb95 100755 --- a/delfin/api/v1/storages.py +++ b/delfin/api/v1/storages.py @@ -30,9 +30,9 @@ from delfin.common import constants from delfin.drivers import api as driverapi from delfin.i18n import _ +from delfin.task_manager import perf_job_controller from delfin.task_manager import rpcapi as task_rpcapi from delfin.task_manager.tasks import resources -from delfin.task_manager.tasks import telemetry as task_telemetry LOG = log.getLogger(__name__) CONF = cfg.CONF @@ -109,7 +109,7 @@ def create(self, req, body): capabilities = self.driver_api.get_capabilities( context=ctxt, storage_id=storage['id']) validation.validate_capabilities(capabilities) - _create_performance_monitoring_task(ctxt, storage['id'], + perf_job_controller.create_perf_job(ctxt, storage['id'], capabilities) except exception.EmptyResourceMetrics: msg = _("Resource metric provided by capabilities is empty for " @@ -134,13 +134,8 @@ def delete(self, req, id): storage['id'], subclass.__module__ + '.' + subclass.__name__) - for subclass in task_telemetry.TelemetryTask.__subclasses__(): - self.task_rpcapi.remove_telemetry_instances(ctxt, - storage['id'], - subclass.__module__ + - '.' - + subclass.__name__) self.task_rpcapi.remove_storage_in_cache(ctxt, storage['id']) + perf_job_controller.delete_perf_job(ctxt, storage['id']) @wsgi.response(202) def sync_all(self, req): @@ -230,7 +225,7 @@ def get_capabilities(self, req, id): storage_info = db.storage_get(ctx, id) # Fetch supported driver's capability - capabilities = self.driver_api.\ + capabilities = self.driver_api. \ get_capabilities(ctx, storage_info['id']) # validate capabilities @@ -265,18 +260,3 @@ def _set_synced_if_ok(context, storage_id, resource_count): storage['sync_status'] = resource_count * constants.ResourceSync.START storage['updated_at'] = current_time db.storage_update(context, storage['id'], storage) - - -def _create_performance_monitoring_task(context, storage_id, capabilities): - # Check resource_metric attribute availability and - # check if resource_metric is empty - if 'resource_metrics' not in capabilities \ - or not bool(capabilities.get('resource_metrics')): - raise exception.EmptyResourceMetrics() - - task = dict() - task.update(storage_id=storage_id) - task.update(args=capabilities.get('resource_metrics')) - task.update(interval=CONF.telemetry.performance_collection_interval) - task.update(method=constants.TelemetryCollection.PERFORMANCE_TASK_METHOD) - db.task_create(context=context, values=task) diff --git a/delfin/cmd/task.py b/delfin/cmd/task.py index f0e2f3ca2..f15b7aae3 100644 --- a/delfin/cmd/task.py +++ b/delfin/cmd/task.py @@ -20,6 +20,7 @@ """Starter script for delfin task service.""" import eventlet + eventlet.monkey_patch() import sys @@ -45,9 +46,18 @@ def main(): task_server = service.TaskService.create(binary='delfin-task', coordination=True) leader_election = service.LeaderElectionService.create() + metrics_task_server = service. \ + TaskService.create(binary='delfin-task', + topic=CONF.host, + manager='delfin.' + 'task_manager.' + 'metrics_manager.' + 'MetricsTaskManager', + coordination=True) service.serve(task_server) service.serve(leader_election) + service.serve(metrics_task_server) service.wait() diff --git a/delfin/common/config.py b/delfin/common/config.py index 5d97a80f1..5ef18d371 100644 --- a/delfin/common/config.py +++ b/delfin/common/config.py @@ -114,6 +114,11 @@ default=constants.TelemetryCollection .DEF_PERFORMANCE_COLLECTION_INTERVAL, help='default interval (in sec) for performance collection'), + cfg.IntOpt('performance_history_on_reschedule', + default=constants.TelemetryCollection + .DEF_PERFORMANCE_HISTORY_ON_RESCHEDULE, + help='default history(in sec) to be collected on a job ' + 'reschedule'), ] CONF.register_opts(telemetry_opts, "telemetry") diff --git a/delfin/common/constants.py b/delfin/common/constants.py index 9665d8e6d..4f64b96e2 100644 --- a/delfin/common/constants.py +++ b/delfin/common/constants.py @@ -405,6 +405,7 @@ class TelemetryCollection(object): MAX_FAILED_JOB_RETRY_COUNT = 5 """Default performance collection interval""" DEF_PERFORMANCE_COLLECTION_INTERVAL = 900 + DEF_PERFORMANCE_HISTORY_ON_RESCHEDULE = 300 class TelemetryTaskStatus(object): diff --git a/delfin/coordination.py b/delfin/coordination.py index 0b4756d2e..b45ba834f 100644 --- a/delfin/coordination.py +++ b/delfin/coordination.py @@ -15,12 +15,14 @@ import inspect import decorator +import tooz from oslo_config import cfg from oslo_log import log from oslo_utils import uuidutils import six from tooz import coordination from tooz import locking +from tooz import partitioner from delfin import cryptor from delfin import exception @@ -320,3 +322,31 @@ def _get_redis_backend_url(): .format(backend_type=CONF.coordination.backend_type, server=CONF.coordination.backend_server) return backend_url + + +class ConsistentHashing(Coordinator): + GROUP_NAME = 'partitioner_group' + + def __init__(self): + super(ConsistentHashing, self). \ + __init__(agent_id=CONF.host, prefix="") + + def join_group(self): + try: + self.coordinator.join_partitioned_group(self.GROUP_NAME) + except tooz.coordination.MemberAlreadyExist: + LOG.info('Member %s already in partitioner_group' % CONF.host) + + def get_task_executor(self, task_id): + part = partitioner.Partitioner(self.coordinator, self.GROUP_NAME) + members = part.members_for_object(task_id) + for member in members: + LOG.info('For task id %s, host should be %s' % (task_id, member)) + return member.decode('utf-8') + + def register_watcher_func(self, on_node_join, on_node_leave): + self.coordinator.watch_join_group(self.GROUP_NAME, on_node_join) + self.coordinator.watch_leave_group(self.GROUP_NAME, on_node_leave) + + def watch_group_change(self): + self.coordinator.run_watchers() diff --git a/delfin/db/sqlalchemy/models.py b/delfin/db/sqlalchemy/models.py index bf1d77a7c..cc8b65eff 100644 --- a/delfin/db/sqlalchemy/models.py +++ b/delfin/db/sqlalchemy/models.py @@ -268,6 +268,7 @@ class Task(BASE, DelfinBase): args = Column(JsonEncodedDict) last_run_time = Column(Integer) job_id = Column(String(36)) + executor = Column(String(255)) deleted_at = Column(DateTime) deleted = Column(Boolean, default=False) @@ -285,6 +286,7 @@ class FailedTask(BASE, DelfinBase): method = Column(String(255)) result = Column(String(255)) job_id = Column(String(36)) + executor = Column(String(255)) deleted_at = Column(DateTime) deleted = Column(Boolean, default=False) diff --git a/delfin/leader_election/distributor/__init__.py b/delfin/leader_election/distributor/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/delfin/leader_election/distributor/perf_job_manager.py b/delfin/leader_election/distributor/perf_job_manager.py new file mode 100644 index 000000000..8acc8ae69 --- /dev/null +++ b/delfin/leader_election/distributor/perf_job_manager.py @@ -0,0 +1,32 @@ +# Copyright 2021 The SODA Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from oslo_log import log + +from delfin import manager +from delfin.leader_election.distributor import task_distributor + +LOG = log.getLogger(__name__) + + +class PerfJobManager(manager.Manager): + """Generate job to job distributor""" + + RPC_API_VERSION = '1.0' + + def __init__(self, service_name=None, *args, **kwargs): + super(PerfJobManager, self).__init__(*args, **kwargs) + + def add_new_job(self, context, task_id): + distributor = task_distributor.TaskDistributor(context) + distributor.distribute_new_job(task_id) diff --git a/delfin/leader_election/distributor/task_distributor.py b/delfin/leader_election/distributor/task_distributor.py new file mode 100644 index 000000000..892efcd71 --- /dev/null +++ b/delfin/leader_election/distributor/task_distributor.py @@ -0,0 +1,62 @@ +# Copyright 2021 The SODA Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import six +from oslo_config import cfg +from oslo_log import log + +from delfin import db +from delfin.common.constants import TelemetryCollection +from delfin.coordination import ConsistentHashing +from delfin.task_manager import metrics_rpcapi as task_rpcapi + +CONF = cfg.CONF +LOG = log.getLogger(__name__) + + +class TaskDistributor(object): + def __init__(self, ctx): + self.ctx = ctx + self.task_rpcapi = task_rpcapi.TaskAPI() + + def distribute_new_job(self, task_id): + partitioner = ConsistentHashing() + partitioner.start() + executor = partitioner.get_task_executor(task_id) + try: + db.task_update(self.ctx, task_id, {'executor': executor}) + LOG.info('Distribute a new job, id: %s' % task_id) + self.task_rpcapi.assign_job(self.ctx, task_id, executor) + except Exception as e: + LOG.error('Failed to distribute the new job, reason: %s', + six.text_type(e)) + raise e + + def distribute_failed_job(self, failed_task_id, executor): + + try: + db.failed_task_update(self.ctx, failed_task_id, + {'executor': executor}) + LOG.info('Distribute a failed job, id: %s' % failed_task_id) + self.task_rpcapi.assign_failed_job(self.ctx, failed_task_id, + executor) + except Exception as e: + LOG.error('Failed to distribute failed job, reason: %s', + six.text_type(e)) + raise e + + @classmethod + def job_interval(cls): + return TelemetryCollection.PERIODIC_JOB_INTERVAL diff --git a/delfin/leader_election/factory.py b/delfin/leader_election/factory.py index cc80bfaf6..8e3cac86b 100644 --- a/delfin/leader_election/factory.py +++ b/delfin/leader_election/factory.py @@ -36,9 +36,10 @@ def construct_elector(plugin, leader_key=None): scheduler_mgr = SchedulerManager() if plugin == "tooz": + scheduler_mgr.start() # Create callback object callback = ToozLeaderElectionCallback.register( - on_leading_callback=scheduler_mgr.start, + on_leading_callback=scheduler_mgr.schedule_boot_jobs, on_stop_callback=scheduler_mgr.stop) return Elector(callback, leader_election_key) diff --git a/delfin/task_manager/manager.py b/delfin/task_manager/manager.py index 63d36db89..348a44445 100644 --- a/delfin/task_manager/manager.py +++ b/delfin/task_manager/manager.py @@ -43,15 +43,6 @@ def sync_storage_resource(self, context, storage_id, resource_task): device_obj = cls(context, storage_id) device_obj.sync() - def collect_telemetry(self, context, storage_id, telemetry_task, - args, start_time, end_time): - LOG.debug("Collecting resource metrics: {0} request for storage" - " id:{1}".format(args, storage_id)) - cls = importutils.import_class(telemetry_task) - device_obj = cls() - return device_obj.collect(context, storage_id, args, start_time, - end_time) - def remove_storage_resource(self, context, storage_id, resource_task): cls = importutils.import_class(resource_task) device_obj = cls(context, storage_id) @@ -63,14 +54,6 @@ def remove_storage_in_cache(self, context, storage_id): drivers = driver_manager.DriverManager() drivers.remove_driver(storage_id) - def remove_telemetry_instances(self, context, storage_id, telemetry_task): - LOG.info('Remove telemetry instances for storage id:{0}') - cls = importutils.import_class(telemetry_task) - device_obj = cls() - return device_obj.remove_telemetry(context, - storage_id, - ) - def sync_storage_alerts(self, context, storage_id, query_para): LOG.info('Alert sync called for storage id:{0}' .format(storage_id)) diff --git a/delfin/task_manager/metrics_manager.py b/delfin/task_manager/metrics_manager.py new file mode 100644 index 000000000..5ef306db4 --- /dev/null +++ b/delfin/task_manager/metrics_manager.py @@ -0,0 +1,60 @@ +# Copyright 2021 The SODA Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +periodical task manager for metric collection tasks** +""" +from oslo_log import log + +from delfin import manager +from delfin.coordination import ConsistentHashing +from delfin.task_manager.scheduler import schedule_manager +from delfin.task_manager.scheduler.schedulers.telemetry.job_handler \ + import FailedJobHandler +from delfin.task_manager.scheduler.schedulers.telemetry.job_handler \ + import JobHandler +from delfin.task_manager.tasks import telemetry + +LOG = log.getLogger(__name__) + + +class MetricsTaskManager(manager.Manager): + """manage periodical tasks""" + + RPC_API_VERSION = '1.0' + + def __init__(self, service_name=None, *args, **kwargs): + self.telemetry_task = telemetry.TelemetryTask() + super(MetricsTaskManager, self).__init__(*args, **kwargs) + scheduler = schedule_manager.SchedulerManager() + scheduler.start() + JobHandler.schedule_boot_jobs() + partitioner = ConsistentHashing() + partitioner.start() + partitioner.join_group() + + def assign_job(self, context, task_id): + instance = JobHandler.get_instance(context, task_id) + instance.schedule_job(task_id) + + def remove_job(self, context, task_id): + instance = JobHandler.get_instance(context, task_id) + instance.remove_job(task_id) + + def assign_failed_job(self, context, failed_task_id): + instance = FailedJobHandler.get_instance(context, failed_task_id) + instance.schedule_failed_job(failed_task_id) + + def remove_failed_job(self, context, failed_task_id): + instance = FailedJobHandler.get_instance(context, failed_task_id) + instance.remove_failed_job(failed_task_id) diff --git a/delfin/task_manager/metrics_rpcapi.py b/delfin/task_manager/metrics_rpcapi.py new file mode 100644 index 000000000..ea2b96b02 --- /dev/null +++ b/delfin/task_manager/metrics_rpcapi.py @@ -0,0 +1,81 @@ +# Copyright 2021 The SODA Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Client side of the metrics task manager RPC API. +""" + +import oslo_messaging as messaging +from oslo_config import cfg + +from delfin import rpc + +CONF = cfg.CONF + + +class TaskAPI(object): + """Client side of the metrics task rpc API. + + API version history: + + 1.0 - Initial version. + """ + + RPC_API_VERSION = '1.0' + + def __init__(self): + super(TaskAPI, self).__init__() + self.target = messaging.Target(topic=CONF.host, + version=self.RPC_API_VERSION) + self.client = rpc.get_client(self.target, + version_cap=self.RPC_API_VERSION) + + def get_client(self, topic): + target = messaging.Target(topic=topic, + version=self.RPC_API_VERSION) + return rpc.get_client(target, version_cap=self.RPC_API_VERSION) + + def assign_job(self, context, task_id, executor): + rpc_client = self.get_client(str(executor)) + call_context = rpc_client.prepare(topic=str(executor), version='1.0', + fanout=True) + return call_context.cast(context, 'assign_job', + task_id=task_id) + + def remove_job(self, context, task_id, executor): + rpc_client = self.get_client(str(executor)) + call_context = rpc_client.prepare(topic=str(executor), version='1.0', + fanout=True) + return call_context.cast(context, 'remove_job', + task_id=task_id) + + def assign_failed_job(self, context, failed_task_id, executor): + rpc_client = self.get_client(str(executor)) + call_context = rpc_client.prepare(topic=str(executor), version='1.0', + fanout=True) + return call_context.cast(context, 'assign_failed_job', + failed_task_id=failed_task_id) + + def remove_failed_job(self, context, failed_task_id, executor): + rpc_client = self.get_client(str(executor)) + call_context = rpc_client.prepare(topic=str(executor), version='1.0', + fanout=True) + return call_context.cast(context, 'remove_failed_job', + failed_task_id=failed_task_id) + + def create_perf_job(self, context, task_id): + rpc_client = self.get_client('JobGenerator') + call_context = rpc_client.prepare(topic='JobGenerator', version='1.0') + return call_context.cast(context, 'add_new_job', + task_id=task_id) diff --git a/delfin/task_manager/perf_job_controller.py b/delfin/task_manager/perf_job_controller.py new file mode 100644 index 000000000..51c27076b --- /dev/null +++ b/delfin/task_manager/perf_job_controller.py @@ -0,0 +1,67 @@ +# Copyright 2021 The SODA Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Client side of the metrics task manager RPC API. +""" + +from oslo_config import cfg +from oslo_log import log + +from delfin import db +from delfin import exception +from delfin.common import constants +from delfin.task_manager import metrics_rpcapi + +LOG = log.getLogger(__name__) +CONF = cfg.CONF + + +def create_perf_job(context, storage_id, capabilities): + # Add it to db + # Check resource_metric attribute availability and + # check if resource_metric is empty + if 'resource_metrics' not in capabilities \ + or not bool(capabilities.get('resource_metrics')): + raise exception.EmptyResourceMetrics() + + task = dict() + task.update(storage_id=storage_id) + task.update(args=capabilities.get('resource_metrics')) + task.update(interval=CONF.telemetry.performance_collection_interval) + task.update( + method=constants.TelemetryCollection.PERFORMANCE_TASK_METHOD) + db.task_create(context=context, values=task) + # Add it to RabbitMQ + filters = {'storage_id': storage_id} + task_id = db.task_get_all(context, filters=filters)[0].get('id') + metrics_rpcapi.TaskAPI().create_perf_job(context, task_id) + + +def delete_perf_job(context, storage_id): + # Delete it from scheduler + filters = {'storage_id': storage_id} + tasks = db.task_get_all(context, filters=filters) + failed_tasks = db.failed_task_get_all(context, filters=filters) + for task in tasks: + metrics_rpcapi.TaskAPI().remove_job(context, task.get('id'), + task.get('executor')) + for failed_task in failed_tasks: + metrics_rpcapi.TaskAPI().remove_failed_job(context, + failed_task.get('id'), + failed_task.get('executor')) + + # Soft delete tasks + db.task_delete_by_storage(context, storage_id) + db.failed_task_delete_by_storage(context, storage_id) diff --git a/delfin/task_manager/scheduler/schedule_manager.py b/delfin/task_manager/scheduler/schedule_manager.py index e495a5b84..34092b211 100644 --- a/delfin/task_manager/scheduler/schedule_manager.py +++ b/delfin/task_manager/scheduler/schedule_manager.py @@ -17,35 +17,33 @@ import six from apscheduler.schedulers.background import BackgroundScheduler from oslo_log import log -from oslo_utils import importutils from oslo_utils import uuidutils from delfin import context +from delfin import db +from delfin import service from delfin import utils -from delfin.task_manager.scheduler.schedulers.telemetry.failed_telemetry_job \ - import FailedTelemetryJob -from delfin.task_manager.scheduler.schedulers.telemetry.telemetry_job import \ - TelemetryJob +from delfin.coordination import ConsistentHashing +from delfin.leader_election.distributor.task_distributor \ + import TaskDistributor +from delfin.task_manager import metrics_rpcapi as task_rpcapi LOG = log.getLogger(__name__) -SCHEDULER_BOOT_JOBS = [ - TelemetryJob.__module__ + '.' + TelemetryJob.__name__, - FailedTelemetryJob.__module__ + '.' + FailedTelemetryJob.__name__ -] - @six.add_metaclass(utils.Singleton) class SchedulerManager(object): + + GROUP_CHANGE_DETECT_INTERVAL_SEC = 30 + def __init__(self, scheduler=None): if not scheduler: scheduler = BackgroundScheduler() self.scheduler = scheduler self.scheduler_started = False - - self.boot_jobs = dict() - self.boot_jobs_scheduled = False self.ctx = context.get_admin_context() + self.task_rpcapi = task_rpcapi.TaskAPI() + self.watch_job_id = None def start(self): """ Initialise the schedulers for periodic job creation @@ -54,36 +52,108 @@ def start(self): self.scheduler.start() self.scheduler_started = True - if not self.boot_jobs_scheduled: - try: - for job in SCHEDULER_BOOT_JOBS: - job_class = importutils.import_class(job) - job_instance = job_class(self.ctx) - - # Create a jobs for periodic scheduling - job_id = uuidutils.generate_uuid() - self.scheduler.add_job(job_instance, 'interval', - seconds=job_class.job_interval(), - next_run_time=datetime.now(), - id=job_id) - # book keeping of jobs - self.boot_jobs[job_id] = job_instance - - except Exception as e: - # TODO: Currently failure of scheduler is failing task manager - # start flow, it is logged and ignored. - LOG.error("Failed to initialize periodic tasks, reason: %s.", - six.text_type(e)) - raise e + def on_node_join(self, event): + # A new node joined the group, all the job would be re-distributed. + # If the job is already on the node, it would be ignore and would + # not be scheduled again + LOG.info('Member %s joined the group %s' % (event.member_id, + event.group_id)) + # Get all the jobs + filters = {'deleted': False} + tasks = db.task_get_all(self.ctx, filters=filters) + distributor = TaskDistributor(self.ctx) + partitioner = ConsistentHashing() + partitioner.start() + for task in tasks: + # Get the specific executor + origin_executor = task['executor'] + # If the target executor is different from current executor, + # remove the job from old executor and add it to new executor + new_executor = partitioner.get_task_executor(task['id']) + if new_executor != origin_executor: + LOG.info('Re-distribute job %s from %s to %s' % + (task['id'], origin_executor, new_executor)) + self.task_rpcapi.remove_job(self.ctx, task['id'], + task['executor']) + distributor.distribute_new_job(task['id']) + failed_tasks = db.failed_task_get_all(self.ctx, filters=filters) + for failed_task in failed_tasks: + # Get the parent task executor + task = db.task_get(self.ctx, failed_task['task_id']) + origin_executor = failed_task['executor'] + new_executor = task['executor'] + # If the target executor is different from current executor, + # remove the job from old executor and add it to new executor + if new_executor != origin_executor: + LOG.info('Re-distribute failed_job %s from %s to %s' % + (failed_task['id'], origin_executor, new_executor)) + self.task_rpcapi.remove_job(self.ctx, task['id'], + task['executor']) + distributor.distribute_failed_job(failed_task['id']) + partitioner.stop() + + def on_node_leave(self, event): + LOG.info('Member %s left the group %s' % (event.member_id, + event.group_id)) + filters = {'executor': event.member_id.decode('utf-8'), + 'deleted': False} + re_distribute_tasks = db.task_get_all(self.ctx, filters=filters) + distributor = TaskDistributor(self.ctx) + for task in re_distribute_tasks: + distributor.distribute_new_job(task['id']) + + re_distribute_failed_tasks = db.failed_task_get_all(self.ctx, + filters=filters) + for failed_task in re_distribute_failed_tasks: + task = db.task_get(self.ctx, failed_task['task_id']) + executor = task['executor'] + distributor.distribute_failed_job(failed_task['id'], executor) + + def schedule_boot_jobs(self): + # Recover the job in db + self.recover_job() + self.recover_failed_job() + # Start the consumer of job creation message + job_generator = service. \ + TaskService.create(binary='delfin-task', + topic='JobGenerator', + manager='delfin.' + 'leader_election.' + 'distributor.' + 'perf_job_manager.' + 'PerfJobManager', + coordination=True) + service.serve(job_generator) + partitioner = ConsistentHashing() + partitioner.start() + partitioner.register_watcher_func(self.on_node_join, + self.on_node_leave) + self.watch_job_id = uuidutils.generate_uuid() + self.scheduler.add_job(partitioner.watch_group_change, 'interval', + seconds=self.GROUP_CHANGE_DETECT_INTERVAL_SEC, + next_run_time=datetime.now(), + id=self.watch_job_id) def stop(self): """Cleanup periodic jobs""" - - for job_id, job in self.boot_jobs.items(): - self.scheduler.remove_job(job_id) - job.stop() - self.boot_jobs.clear() - self.boot_jobs_scheduled = False + if self.watch_job_id: + self.scheduler.remove_job(self.watch_job_id) def get_scheduler(self): return self.scheduler + + def recover_job(self): + filters = {'deleted': False} + all_tasks = db.task_get_all(self.ctx, filters=filters) + distributor = TaskDistributor(self.ctx) + for task in all_tasks: + distributor.distribute_new_job(task['id']) + + def recover_failed_job(self): + filters = {'deleted': False} + all_failed_tasks = db.failed_task_get_all(self.ctx, filters=filters) + distributor = TaskDistributor(self.ctx) + for failed_task in all_failed_tasks: + task = db.task_get(self.ctx, failed_task['task_id']) + executor = task['executor'] + distributor.distribute_failed_job(failed_task['id'], executor) diff --git a/delfin/task_manager/scheduler/schedulers/telemetry/failed_performance_collection_handler.py b/delfin/task_manager/scheduler/schedulers/telemetry/failed_performance_collection_handler.py index 86b1e5793..e1d4e020c 100644 --- a/delfin/task_manager/scheduler/schedulers/telemetry/failed_performance_collection_handler.py +++ b/delfin/task_manager/scheduler/schedulers/telemetry/failed_performance_collection_handler.py @@ -22,9 +22,9 @@ from delfin.db.sqlalchemy.models import FailedTask from delfin.db.sqlalchemy.models import Task from delfin.i18n import _ -from delfin.task_manager import rpcapi as task_rpcapi from delfin.task_manager.scheduler import schedule_manager from delfin.task_manager.tasks.telemetry import PerformanceCollectionTask +from delfin.task_manager import metrics_rpcapi as metrics_task_rpcapi LOG = log.getLogger(__name__) CONF = cfg.CONF @@ -32,7 +32,7 @@ class FailedPerformanceCollectionHandler(object): def __init__(self, ctx, failed_task_id, storage_id, args, job_id, - retry_count, start_time, end_time): + retry_count, start_time, end_time, executor): self.ctx = ctx self.failed_task_id = failed_task_id self.retry_count = retry_count @@ -41,10 +41,11 @@ def __init__(self, ctx, failed_task_id, storage_id, args, job_id, self.args = args self.start_time = start_time self.end_time = end_time - self.task_rpcapi = task_rpcapi.TaskAPI() + self.metrics_task_rpcapi = metrics_task_rpcapi.TaskAPI() self.scheduler_instance = \ schedule_manager.SchedulerManager().get_scheduler() self.result = TelemetryJobStatus.FAILED_JOB_STATUS_INIT + self.executor = executor @staticmethod def get_instance(ctx, failed_task_id): @@ -59,6 +60,7 @@ def get_instance(ctx, failed_task_id): failed_task[FailedTask.retry_count.name], failed_task[FailedTask.start_time.name], failed_task[FailedTask.end_time.name], + failed_task[FailedTask.executor.name], ) def __call__(self): @@ -77,14 +79,11 @@ def __call__(self): % (self.storage_id, self.failed_task_id)) return - # Pull performance collection info self.retry_count = self.retry_count + 1 try: - status = self.task_rpcapi.collect_telemetry( - self.ctx, self.storage_id, - PerformanceCollectionTask.__module__ + '.' + - PerformanceCollectionTask.__name__, - self.args, self.start_time, self.end_time) + telemetry = PerformanceCollectionTask() + status = telemetry.collect(self.ctx, self.storage_id, self.args, + self.start_time, self.end_time) if not status: raise exception.TelemetryTaskExecError() @@ -121,4 +120,6 @@ def _stop_task(self): db.failed_task_update(self.ctx, self.failed_task_id, {FailedTask.retry_count.name: self.retry_count, FailedTask.result.name: self.result}) - self.scheduler_instance.pause_job(self.job_id) + self.metrics_task_rpcapi.remove_failed_job(self.ctx, + self.failed_task_id, + self.executor) diff --git a/delfin/task_manager/scheduler/schedulers/telemetry/failed_telemetry_job.py b/delfin/task_manager/scheduler/schedulers/telemetry/failed_telemetry_job.py deleted file mode 100644 index 55945becf..000000000 --- a/delfin/task_manager/scheduler/schedulers/telemetry/failed_telemetry_job.py +++ /dev/null @@ -1,148 +0,0 @@ -# Copyright 2021 The SODA Authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from datetime import datetime - -import six -from oslo_config import cfg -from oslo_log import log -from oslo_utils import importutils -from oslo_utils import uuidutils - -from delfin import db -from delfin.common.constants import TelemetryJobStatus, TelemetryCollection -from delfin.db.sqlalchemy.models import FailedTask -from delfin.exception import TaskNotFound -from delfin.task_manager.scheduler import schedule_manager - -CONF = cfg.CONF -LOG = log.getLogger(__name__) - - -class FailedTelemetryJob(object): - def __init__(self, ctx): - # create the object of periodic scheduler - self.scheduler = schedule_manager.SchedulerManager().get_scheduler() - self.ctx = ctx - self.stopped = False - self.job_ids = set() - - def __call__(self): - """ - :return: - """ - - if self.stopped: - return - - try: - # Remove jobs from scheduler when marked for delete - filters = {'deleted': True} - failed_tasks = db.failed_task_get_all(self.ctx, filters=filters) - LOG.debug("Total failed_tasks found deleted " - "in this cycle:%s" % len(failed_tasks)) - for failed_task in failed_tasks: - job_id = failed_task['job_id'] - self.remove_scheduled_job(job_id) - db.failed_task_delete(self.ctx, failed_task['id']) - except Exception as e: - LOG.error("Failed to remove periodic scheduling job , reason: %s.", - six.text_type(e)) - try: - # Create the object of periodic scheduler - failed_tasks = db.failed_task_get_all(self.ctx) - - if not len(failed_tasks): - LOG.info("No failed task found for performance collection") - return - - LOG.debug("Schedule performance collection triggered: total " - "failed tasks:%s" % len(failed_tasks)) - - for failed_task in failed_tasks: - failed_task_id = failed_task[FailedTask.id.name] - LOG.info("Processing failed task : %s" % failed_task_id) - - # Get failed jobs, if retry count has reached max, - # remove job and delete db entry - retry_count = failed_task[FailedTask.retry_count.name] - result = failed_task[FailedTask.result.name] - job_id = failed_task[FailedTask.job_id.name] - if retry_count >= \ - TelemetryCollection.MAX_FAILED_JOB_RETRY_COUNT or \ - result == TelemetryJobStatus.FAILED_JOB_STATUS_SUCCESS: - LOG.info("Exiting Failure task processing for task [%d] " - "with result [%s] and retry count [%d] " - % (failed_task_id, result, retry_count)) - # task ID is same as job id - self._teardown_task(self.ctx, failed_task_id, job_id) - continue - - # If job already scheduled, skip - if job_id and self.scheduler.get_job(job_id): - continue - - try: - db.task_get(self.ctx, - failed_task[FailedTask.task_id.name]) - except TaskNotFound as e: - LOG.info("Removing failed telemetry job as parent job " - "do not exist: %s", six.text_type(e)) - # tear down if original task is not available - self._teardown_task(self.ctx, failed_task_id, - job_id) - continue - - if not job_id: - job_id = uuidutils.generate_uuid() - db.failed_task_update(self.ctx, failed_task_id, - {FailedTask.job_id.name: job_id}) - - collection_class = importutils.import_class( - failed_task[FailedTask.method.name]) - instance = \ - collection_class.get_instance(self.ctx, failed_task_id) - self.scheduler.add_job( - instance, 'interval', - seconds=failed_task[FailedTask.interval.name], - next_run_time=datetime.now(), id=job_id, - misfire_grace_time=int( - CONF.telemetry.performance_collection_interval / 2) - ) - self.job_ids.add(job_id) - - except Exception as e: - LOG.error("Failed to schedule retry tasks for performance " - "collection, reason: %s", six.text_type(e)) - else: - LOG.info("Schedule collection completed") - - def _teardown_task(self, ctx, failed_task_id, job_id): - db.failed_task_delete(ctx, failed_task_id) - self.remove_scheduled_job(job_id) - - def remove_scheduled_job(self, job_id): - if job_id in self.job_ids: - self.job_ids.remove(job_id) - if job_id and self.scheduler.get_job(job_id): - self.scheduler.remove_job(job_id) - - def stop(self): - self.stopped = True - for job_id in self.job_ids.copy(): - self.remove_scheduled_job(job_id) - - @classmethod - def job_interval(cls): - return TelemetryCollection.FAILED_JOB_SCHEDULE_INTERVAL diff --git a/delfin/task_manager/scheduler/schedulers/telemetry/job_handler.py b/delfin/task_manager/scheduler/schedulers/telemetry/job_handler.py new file mode 100644 index 000000000..6e8b145c4 --- /dev/null +++ b/delfin/task_manager/scheduler/schedulers/telemetry/job_handler.py @@ -0,0 +1,265 @@ +# Copyright 2021 The SODA Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime + +import six +from oslo_config import cfg +from oslo_log import log +from oslo_utils import uuidutils, importutils + +from delfin import db, context +from delfin.common.constants import TelemetryCollection, TelemetryJobStatus +from delfin.exception import TaskNotFound +from delfin.task_manager import rpcapi as task_rpcapi +from delfin.task_manager.scheduler import schedule_manager +from delfin.task_manager.tasks.telemetry import PerformanceCollectionTask + +CONF = cfg.CONF +LOG = log.getLogger(__name__) + + +class JobHandler(object): + def __init__(self, ctx, task_id, storage_id, args, interval): + # create an object of periodic task scheduler + self.ctx = ctx + self.task_id = task_id + self.storage_id = storage_id + self.args = args + self.interval = interval + self.task_rpcapi = task_rpcapi.TaskAPI() + self.scheduler = schedule_manager.SchedulerManager().get_scheduler() + self.stopped = False + self.job_ids = set() + + @staticmethod + def get_instance(ctx, task_id): + task = db.task_get(ctx, task_id) + return JobHandler(ctx, task_id, task['storage_id'], + task['args'], task['interval']) + + @staticmethod + def schedule_boot_jobs(): + """Schedule periodic collection if any task is currently assigned to + this executor """ + try: + + filters = {'executor': CONF.host, + 'deleted': False} + ctxt = context.get_admin_context() + tasks = db.task_get_all(ctxt, filters=filters) + failed_tasks = db.failed_task_get_all(ctxt, filters=filters) + LOG.info("Scheduling boot time jobs for this executor: total " + "jobs to be handled :%s" % len(tasks)) + for task in tasks: + instance = JobHandler.get_instance(ctxt, task['id']) + instance.schedule_job(task['id']) + LOG.debug('Periodic collection job assigned for id: ' + '%s ' % task['id']) + for failed_task in failed_tasks: + instance = FailedJobHandler.get_instance(ctxt, + failed_task['id']) + instance.schedule_failed_job(failed_task['id']) + LOG.debug('Failed job assigned for id: ' + '%s ' % failed_task['id']) + + except Exception as e: + LOG.error("Failed to schedule boot jobs for this executor " + "reason: %s.", + six.text_type(e)) + else: + LOG.debug("Boot job scheduling completed.") + + def schedule_job(self, task_id): + + if self.stopped: + # If Job is stopped return immediately + return + + LOG.info("JobHandler received A job %s to schedule" % task_id) + job = db.task_get(self.ctx, task_id) + # Check delete status of the task + deleted = job['deleted'] + if deleted: + return + collection_class = importutils.import_class( + job['method']) + instance = collection_class.get_instance(self.ctx, self.task_id) + current_time = int(datetime.now().timestamp()) + last_run_time = current_time + next_collection_time = last_run_time + job['interval'] + job_id = uuidutils.generate_uuid() + next_collection_time = datetime \ + .fromtimestamp(next_collection_time) \ + .strftime('%Y-%m-%d %H:%M:%S') + + existing_job_id = job['job_id'] + + scheduler_job = self.scheduler.get_job(existing_job_id) + + if not (existing_job_id and scheduler_job): + LOG.info('JobHandler scheduling a new job') + if job['last_run_time']: + # Trigger one historic collection to make sure we do not + # miss any Data points due to reschedule + LOG.debug('Triggering one historic collection for job %s', + job['id']) + history_on_reschedule = CONF.telemetry. \ + performance_history_on_reschedule + end_time = current_time * 1000 + start_time = end_time - (history_on_reschedule * 1000) + telemetry = PerformanceCollectionTask() + telemetry.collect(self.ctx, self.storage_id, + self.args, + start_time, end_time) + + db.task_update(self.ctx, self.task_id, + {'last_run_time': last_run_time}) + + self.scheduler.add_job( + instance, 'interval', seconds=job['interval'], + next_run_time=next_collection_time, id=job_id, + misfire_grace_time=int( + CONF.telemetry.performance_collection_interval / 2)) + + update_task_dict = {'job_id': job_id + } + db.task_update(self.ctx, self.task_id, update_task_dict) + self.job_ids.add(job_id) + LOG.info('Periodic collection tasks scheduled for for job id: ' + '%s ' % self.task_id) + else: + LOG.info('Job already exists with this scheduler') + + def stop(self): + self.stopped = True + for job_id in self.job_ids.copy(): + self.remove_scheduled_job(job_id) + LOG.info("Stopping telemetry jobs") + + def remove_scheduled_job(self, job_id): + if job_id in self.job_ids: + self.job_ids.remove(job_id) + if job_id and self.scheduler.get_job(job_id): + self.scheduler.remove_job(job_id) + + def remove_job(self, task_id): + try: + LOG.info("Received job %s to remove", task_id) + job = db.task_get(self.ctx, task_id) + job_id = job['job_id'] + self.remove_scheduled_job(job_id) + except Exception as e: + LOG.error("Failed to remove periodic scheduling job , reason: %s.", + six.text_type(e)) + + +class FailedJobHandler(object): + def __init__(self, ctx): + # create an object of periodic failed task scheduler + self.scheduler = schedule_manager.SchedulerManager().get_scheduler() + self.ctx = ctx + self.stopped = False + self.job_ids = set() + + @staticmethod + def get_instance(ctx, failed_task_id): + return FailedJobHandler(ctx) + + def schedule_failed_job(self, failed_task_id): + + if self.stopped: + return + + try: + job = db.failed_task_get(self.ctx, failed_task_id) + retry_count = job['retry_count'] + result = job['result'] + job_id = job['job_id'] + if retry_count >= \ + TelemetryCollection.MAX_FAILED_JOB_RETRY_COUNT or \ + result == TelemetryJobStatus.FAILED_JOB_STATUS_SUCCESS: + LOG.info("Exiting Failure task processing for task [%d] " + "with result [%s] and retry count [%d] " + % (job['id'], result, retry_count)) + self._teardown_task(self.ctx, job['id'], job_id) + return + # If job already scheduled, skip + if job_id and self.scheduler.get_job(job_id): + return + + try: + db.task_get(self.ctx, job['task_id']) + except TaskNotFound as e: + LOG.info("Removing failed telemetry job as parent job " + "do not exist: %s", six.text_type(e)) + # tear down if original task is not available + self._teardown_task(self.ctx, job['id'], + job_id) + return + + if not (job_id and self.scheduler.get_job(job_id)): + job_id = uuidutils.generate_uuid() + db.failed_task_update(self.ctx, job['id'], + {'job_id': job_id}) + + collection_class = importutils.import_class( + job['method']) + instance = \ + collection_class.get_instance(self.ctx, job['id']) + self.scheduler.add_job( + instance, 'interval', + seconds=job['interval'], + next_run_time=datetime.now(), id=job_id, + misfire_grace_time=int( + CONF.telemetry.performance_collection_interval / 2) + ) + self.job_ids.add(job_id) + + except Exception as e: + LOG.error("Failed to schedule retry tasks for performance " + "collection, reason: %s", six.text_type(e)) + else: + LOG.info("Schedule collection completed") + + def _teardown_task(self, ctx, failed_task_id, job_id): + db.failed_task_delete(ctx, failed_task_id) + self.remove_scheduled_job(job_id) + + def remove_scheduled_job(self, job_id): + if job_id in self.job_ids: + self.job_ids.remove(job_id) + if job_id and self.scheduler.get_job(job_id): + self.scheduler.remove_job(job_id) + + def stop(self): + self.stopped = True + for job_id in self.job_ids.copy(): + self.remove_scheduled_job(job_id) + + def remove_failed_job(self, failed_task_id): + try: + LOG.info("Received failed job %s to remove", failed_task_id) + job = db.failed_task_get(self.ctx, failed_task_id) + job_id = job['job_id'] + self.remove_scheduled_job(job_id) + db.failed_task_delete(self.ctx, job['id']) + LOG.info("Removed failed_task entry %s ", job['id']) + except Exception as e: + LOG.error("Failed to remove periodic scheduling job , reason: %s.", + six.text_type(e)) + + @classmethod + def job_interval(cls): + return TelemetryCollection.FAILED_JOB_SCHEDULE_INTERVAL diff --git a/delfin/task_manager/scheduler/schedulers/telemetry/performance_collection_handler.py b/delfin/task_manager/scheduler/schedulers/telemetry/performance_collection_handler.py index d42b31ddf..0a61b7b89 100644 --- a/delfin/task_manager/scheduler/schedulers/telemetry/performance_collection_handler.py +++ b/delfin/task_manager/scheduler/schedulers/telemetry/performance_collection_handler.py @@ -15,35 +15,39 @@ from datetime import datetime import six +from delfin.task_manager.scheduler.schedulers.telemetry. \ + failed_performance_collection_handler import \ + FailedPerformanceCollectionHandler from oslo_log import log from delfin import db from delfin import exception from delfin.common.constants import TelemetryCollection from delfin.db.sqlalchemy.models import FailedTask -from delfin.task_manager import rpcapi as task_rpcapi -from delfin.task_manager.scheduler.schedulers.telemetry. \ - failed_performance_collection_handler import \ - FailedPerformanceCollectionHandler -from delfin.task_manager.tasks import telemetry +from delfin.task_manager import metrics_rpcapi as metrics_task_rpcapi +from delfin.task_manager.scheduler import schedule_manager +from delfin.task_manager.tasks.telemetry import PerformanceCollectionTask LOG = log.getLogger(__name__) class PerformanceCollectionHandler(object): - def __init__(self, ctx, task_id, storage_id, args, interval): + def __init__(self, ctx, task_id, storage_id, args, interval, executor): self.ctx = ctx self.task_id = task_id self.storage_id = storage_id self.args = args self.interval = interval - self.task_rpcapi = task_rpcapi.TaskAPI() + self.metric_task_rpcapi = metrics_task_rpcapi.TaskAPI() + self.executor = executor + self.scheduler = schedule_manager.SchedulerManager().get_scheduler() @staticmethod def get_instance(ctx, task_id): task = db.task_get(ctx, task_id) return PerformanceCollectionHandler(ctx, task_id, task['storage_id'], - task['args'], task['interval']) + task['args'], task['interval'], + task['executor']) def __call__(self): # Upon periodic job callback, if storage is already deleted or soft @@ -72,11 +76,9 @@ def __call__(self): # Times are epoch time in milliseconds end_time = current_time * 1000 start_time = end_time - (self.interval * 1000) - status = self.task_rpcapi. \ - collect_telemetry(self.ctx, self.storage_id, - telemetry.TelemetryTask.__module__ + '.' + - 'PerformanceCollectionTask', self.args, - start_time, end_time) + telemetry = PerformanceCollectionTask() + status = telemetry.collect(self.ctx, self.storage_id, self.args, + start_time, end_time) db.task_update(self.ctx, self.task_id, {'last_run_time': current_time}) @@ -103,5 +105,9 @@ def _handle_task_failure(self, start_time, end_time): FailedTask.method.name: FailedPerformanceCollectionHandler.__module__ + '.' + FailedPerformanceCollectionHandler.__name__, - FailedTask.retry_count.name: 0} - db.failed_task_create(self.ctx, failed_task) + FailedTask.retry_count.name: 0, + FailedTask.executor.name: self.executor} + failed_task = db.failed_task_create(self.ctx, failed_task) + self.metric_task_rpcapi.assign_failed_job(self.ctx, + failed_task['id'], + failed_task['executor']) diff --git a/delfin/task_manager/scheduler/schedulers/telemetry/telemetry_job.py b/delfin/task_manager/scheduler/schedulers/telemetry/telemetry_job.py deleted file mode 100644 index cf66b5c3c..000000000 --- a/delfin/task_manager/scheduler/schedulers/telemetry/telemetry_job.py +++ /dev/null @@ -1,120 +0,0 @@ -# Copyright 2021 The SODA Authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from datetime import datetime - -import six -from oslo_config import cfg -from oslo_log import log -from oslo_utils import importutils -from oslo_utils import uuidutils - -from delfin import db -from delfin.common.constants import TelemetryCollection -from delfin.task_manager.scheduler import schedule_manager - -CONF = cfg.CONF -LOG = log.getLogger(__name__) - - -class TelemetryJob(object): - def __init__(self, ctx): - self.ctx = ctx - self.scheduler = schedule_manager.SchedulerManager().get_scheduler() - - # Reset last run time of tasks to restart scheduling and - # start the failed task job - task_list = db.task_get_all(ctx) - for task in task_list: - db.task_update(ctx, task['id'], {'last_run_time': None}) - - self.stopped = False - self.job_ids = set() - - def __call__(self): - """ Schedule the collection tasks based on interval """ - - if self.stopped: - """If Job is stopped return immediately""" - return - - try: - # Remove jobs from scheduler when marked for delete - filters = {'deleted': True} - tasks = db.task_get_all(self.ctx, filters=filters) - LOG.debug("Total tasks found deleted " - "in this cycle:%s" % len(tasks)) - for task in tasks: - job_id = task['job_id'] - if job_id and self.scheduler.get_job(job_id): - self.remove_scheduled_job(job_id) - db.task_delete(self.ctx, task['id']) - except Exception as e: - LOG.error("Failed to remove periodic scheduling job , reason: %s.", - six.text_type(e)) - try: - - filters = {'last_run_time': None} - tasks = db.task_get_all(self.ctx, filters=filters) - LOG.debug("Schedule performance collection triggered: total " - "tasks to be handled:%s" % len(tasks)) - for task in tasks: - # Get current time in epoch format in seconds. Here method - # indicates the specific collection task to be triggered - current_time = int(datetime.now().timestamp()) - last_run_time = current_time - next_collection_time = last_run_time + task['interval'] - task_id = task['id'] - job_id = uuidutils.generate_uuid() - next_collection_time = datetime \ - .fromtimestamp(next_collection_time) \ - .strftime('%Y-%m-%d %H:%M:%S') - - collection_class = importutils.import_class(task['method']) - instance = collection_class.get_instance(self.ctx, task_id) - self.scheduler.add_job( - instance, 'interval', seconds=task['interval'], - next_run_time=next_collection_time, id=job_id, - misfire_grace_time=int( - CONF.telemetry.performance_collection_interval / 2)) - - # jobs book keeping - self.job_ids.add(job_id) - - update_task_dict = {'job_id': job_id, - 'last_run_time': last_run_time} - db.task_update(self.ctx, task_id, update_task_dict) - LOG.info('Periodic collection task triggered for for task id: ' - '%s ' % task['id']) - except Exception as e: - LOG.error("Failed to trigger periodic collection, reason: %s.", - six.text_type(e)) - else: - LOG.debug("Periodic collection task Scheduling completed.") - - def stop(self): - self.stopped = True - for job_id in self.job_ids.copy(): - self.remove_scheduled_job(job_id) - LOG.info("Stopping telemetry jobs") - - @classmethod - def job_interval(cls): - return TelemetryCollection.PERIODIC_JOB_INTERVAL - - def remove_scheduled_job(self, job_id): - if job_id in self.job_ids: - self.job_ids.remove(job_id) - if job_id and self.scheduler.get_job(job_id): - self.scheduler.remove_job(job_id) diff --git a/delfin/task_manager/tasks/telemetry.py b/delfin/task_manager/tasks/telemetry.py index 46b5bc5a1..b915f6828 100644 --- a/delfin/task_manager/tasks/telemetry.py +++ b/delfin/task_manager/tasks/telemetry.py @@ -74,12 +74,3 @@ def collect(self, ctx, storage_id, args, start_time, end_time): "storage id :{0}, reason:{1}".format(storage_id, six.text_type(e))) return TelemetryTaskStatus.TASK_EXEC_STATUS_FAILURE - - def remove_telemetry(self, ctx, storage_id): - try: - db.task_delete_by_storage(ctx, storage_id) - db.failed_task_delete_by_storage(ctx, storage_id) - except Exception as e: - LOG.error("Failed to remove task entries from DB for " - "storage id :{0}, reason:{1}".format(storage_id, - six.text_type(e))) diff --git a/delfin/tests/unit/api/v1/test_storages.py b/delfin/tests/unit/api/v1/test_storages.py index 4433ed594..9e46796bb 100644 --- a/delfin/tests/unit/api/v1/test_storages.py +++ b/delfin/tests/unit/api/v1/test_storages.py @@ -27,6 +27,7 @@ class TestStorageController(test.TestCase): def setUp(self): super(TestStorageController, self).setUp() self.task_rpcapi = mock.Mock() + self.metrics_task_rpcapi = mock.Mock() self.driver_api = mock.Mock() self.controller = StorageController() self.mock_object(self.controller, 'task_rpcapi', self.task_rpcapi) @@ -34,15 +35,15 @@ def setUp(self): @mock.patch.object(db, 'storage_get', mock.Mock(return_value={'id': 'fake_id'})) - def test_delete(self): + @mock.patch('delfin.task_manager.perf_job_controller.delete_perf_job') + def test_delete(self, perf_job_controller): req = fakes.HTTPRequest.blank('/storages/fake_id') self.controller.delete(req, 'fake_id') ctxt = req.environ['delfin.context'] db.storage_get.assert_called_once_with(ctxt, 'fake_id') self.task_rpcapi.remove_storage_resource.assert_called_with( ctxt, 'fake_id', mock.ANY) - self.task_rpcapi.remove_telemetry_instances.assert_called_once_with( - ctxt, 'fake_id', mock.ANY) + self.assertEqual(perf_job_controller.call_count, 1) self.task_rpcapi.remove_storage_in_cache.assert_called_once_with( ctxt, 'fake_id') diff --git a/delfin/tests/unit/leader_election/__init__.py b/delfin/tests/unit/leader_election/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/delfin/tests/unit/leader_election/distributor/__init__.py b/delfin/tests/unit/leader_election/distributor/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/delfin/tests/unit/leader_election/distributor/test_task_distributor.py b/delfin/tests/unit/leader_election/distributor/test_task_distributor.py new file mode 100644 index 000000000..8f95d4c14 --- /dev/null +++ b/delfin/tests/unit/leader_election/distributor/test_task_distributor.py @@ -0,0 +1,58 @@ +# Copyright 2021 The SODA Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock + +from oslo_utils import uuidutils + +from delfin import context +from delfin import db +from delfin import test +from delfin.common import constants +from delfin.db.sqlalchemy.models import Task +from delfin.leader_election.distributor.task_distributor import TaskDistributor + +fake_telemetry_job = { + Task.id.name: 2, + Task.storage_id.name: uuidutils.generate_uuid(), + Task.args.name: {}, + Task.interval.name: 10, + Task.method.name: constants.TelemetryCollection.PERFORMANCE_TASK_METHOD, + Task.last_run_time.name: None, + Task.deleted.name: 0, +} + +fake_telemetry_jobs = [ + fake_telemetry_job, +] + + +class TestTaskDistributor(test.TestCase): + + @mock.patch('delfin.coordination.ConsistentHashing.get_task_executor') + @mock.patch('delfin.coordination.ConsistentHashing.start') + @mock.patch('delfin.task_manager.metrics_rpcapi.TaskAPI.assign_job') + @mock.patch.object(db, 'task_update') + @mock.patch('delfin.coordination.ConsistentHashing.__init__', + mock.Mock(return_value=None)) + def test_distribute_new_job(self, mock_task_update, mock_assign_job, + mock_partitioner_start, + mock_get_task_executor): + ctx = context.get_admin_context() + task_distributor = TaskDistributor(ctx) + task_distributor.distribute_new_job('fake_task_id') + self.assertEqual(mock_assign_job.call_count, 1) + self.assertEqual(mock_task_update.call_count, 1) + self.assertEqual(mock_partitioner_start.call_count, 1) + self.assertEqual(mock_get_task_executor.call_count, 1) diff --git a/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_failed_performance_collection_handler.py b/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_failed_performance_collection_handler.py index 85e5a5329..f3fc7513e 100644 --- a/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_failed_performance_collection_handler.py +++ b/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_failed_performance_collection_handler.py @@ -44,6 +44,7 @@ FailedTask.end_time.name: int(datetime.now().timestamp()) + 20, FailedTask.interval.name: 20, FailedTask.deleted.name: False, + FailedTask.executor.name: 'node1', } fake_deleted_storage_failed_job = { @@ -59,12 +60,14 @@ FailedTask.end_time.name: int(datetime.now().timestamp()) + 20, FailedTask.interval.name: 20, FailedTask.deleted.name: True, + FailedTask.executor.name: 'node1', } fake_telemetry_job = { Task.id.name: 2, Task.storage_id.name: uuidutils.generate_uuid(), Task.args.name: {}, + Task.executor.name: 'node1', } @@ -78,12 +81,12 @@ class TestFailedPerformanceCollectionHandler(test.TestCase): mock.Mock(return_value=fake_telemetry_job)) @mock.patch.object(db, 'failed_task_get', mock.Mock(return_value=fake_failed_job)) - @mock.patch( - 'apscheduler.schedulers.background.BackgroundScheduler.pause_job') + @mock.patch('delfin.task_manager.metrics_rpcapi.TaskAPI.remove_failed_job') @mock.patch('delfin.db.failed_task_update') - @mock.patch('delfin.task_manager.rpcapi.TaskAPI.collect_telemetry') + @mock.patch('delfin.task_manager.tasks.telemetry' + '.PerformanceCollectionTask.collect') def test_failed_job_success(self, mock_collect_telemetry, - mock_failed_task_update, mock_pause_job): + mock_failed_task_update, mock_failed_job): mock_collect_telemetry.return_value = TelemetryTaskStatus. \ TASK_EXEC_STATUS_SUCCESS ctx = context.get_admin_context() @@ -92,7 +95,7 @@ def test_failed_job_success(self, mock_collect_telemetry, # call failed job failed_job_handler() - self.assertEqual(mock_pause_job.call_count, 1) + self.assertEqual(mock_failed_job.call_count, 1) mock_failed_task_update.assert_called_once_with( ctx, fake_failed_job_id, @@ -105,12 +108,11 @@ def test_failed_job_success(self, mock_collect_telemetry, mock.Mock(return_value=fake_telemetry_job)) @mock.patch.object(db, 'failed_task_get', mock.Mock(return_value=fake_failed_job)) - @mock.patch( - 'apscheduler.schedulers.background.BackgroundScheduler.pause_job') + @mock.patch('delfin.task_manager.metrics_rpcapi.TaskAPI.remove_failed_job') @mock.patch('delfin.db.failed_task_update') @mock.patch('delfin.task_manager.rpcapi.TaskAPI.collect_telemetry') def test_failed_job_failure(self, mock_collect_telemetry, - mock_failed_task_update, mock_pause_job): + mock_failed_task_update, mock_failed_job): mock_collect_telemetry.return_value = TelemetryTaskStatus. \ TASK_EXEC_STATUS_FAILURE ctx = context.get_admin_context() @@ -120,7 +122,7 @@ def test_failed_job_failure(self, mock_collect_telemetry, # call failed job failed_job_handler() - self.assertEqual(mock_pause_job.call_count, 0) + self.assertEqual(mock_failed_job.call_count, 0) mock_failed_task_update.assert_called_once_with( ctx, fake_failed_job_id, @@ -132,12 +134,12 @@ def test_failed_job_failure(self, mock_collect_telemetry, @mock.patch.object(db, 'task_get', mock.Mock(return_value=fake_telemetry_job)) @mock.patch.object(db, 'failed_task_get') - @mock.patch( - 'apscheduler.schedulers.background.BackgroundScheduler.pause_job') + @mock.patch('delfin.task_manager.metrics_rpcapi.TaskAPI.remove_failed_job') @mock.patch('delfin.db.failed_task_update') @mock.patch('delfin.task_manager.rpcapi.TaskAPI.collect_telemetry') def test_failed_job_fail_max_times(self, mock_collect_telemetry, - mock_failed_task_update, mock_pause_job, + mock_failed_task_update, + mock_remove_job, mock_failed_task_get): mock_collect_telemetry.return_value = TelemetryTaskStatus. \ TASK_EXEC_STATUS_FAILURE @@ -155,7 +157,7 @@ def test_failed_job_fail_max_times(self, mock_collect_telemetry, # call failed job failed_job_handler() - self.assertEqual(mock_pause_job.call_count, 1) + self.assertEqual(mock_remove_job.call_count, 1) mock_failed_task_update.assert_called_once_with( ctx, fake_failed_job_id, @@ -169,8 +171,7 @@ def test_failed_job_fail_max_times(self, mock_collect_telemetry, mock.Mock(return_value=fake_telemetry_job)) @mock.patch.object(db, 'failed_task_get', mock.Mock(return_value=fake_deleted_storage_failed_job)) - @mock.patch( - 'apscheduler.schedulers.background.BackgroundScheduler.pause_job') + @mock.patch('delfin.task_manager.metrics_rpcapi.TaskAPI.remove_failed_job') @mock.patch('delfin.db.failed_task_update') @mock.patch('delfin.task_manager.rpcapi.TaskAPI.collect_telemetry') def test_failed_job_deleted_storage(self, mock_collect_telemetry, @@ -189,7 +190,7 @@ def test_failed_job_deleted_storage(self, mock_collect_telemetry, mock.Mock(return_value=fake_telemetry_job)) @mock.patch.object(db, 'failed_task_get', failed_task_not_found_exception) @mock.patch( - 'apscheduler.schedulers.background.BackgroundScheduler.pause_job', + 'delfin.task_manager.metrics_rpcapi.TaskAPI.remove_failed_job', mock.Mock()) @mock.patch('delfin.db.failed_task_update') @mock.patch('delfin.task_manager.rpcapi.TaskAPI.collect_telemetry') @@ -198,7 +199,7 @@ def test_deleted_storage_exception(self, mock_collect_telemetry, ctx = context.get_admin_context() failed_job_handler = FailedPerformanceCollectionHandler( ctx, 1122, '12c2d52f-01bc-41f5-b73f-7abf6f38a2a6', '', - 1234, 2, 1122334400, 1122334800) + 1234, 2, 1122334400, 1122334800, 'node1') failed_job_handler() # Verify that no action performed for deleted storage failed tasks diff --git a/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_failed_telemetry_job.py b/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_job_handler.py similarity index 51% rename from delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_failed_telemetry_job.py rename to delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_job_handler.py index 3be557dc6..e3ea65123 100644 --- a/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_failed_telemetry_job.py +++ b/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_job_handler.py @@ -12,23 +12,70 @@ # See the License for the specific language governing permissions and # limitations under the License. -from datetime import datetime from unittest import mock - +from datetime import datetime from oslo_utils import uuidutils from delfin import context from delfin import db from delfin import test -from delfin.common.constants import TelemetryCollection -from delfin.db.sqlalchemy.models import FailedTask +from delfin.common import constants from delfin.db.sqlalchemy.models import Task +from delfin.task_manager.scheduler.schedulers.telemetry.job_handler import \ + JobHandler +from delfin.task_manager.scheduler.schedulers.telemetry.job_handler import \ + FailedJobHandler +from delfin.db.sqlalchemy.models import FailedTask from delfin.task_manager.scheduler.schedulers.telemetry. \ failed_performance_collection_handler import \ FailedPerformanceCollectionHandler -from delfin.task_manager.scheduler.schedulers.telemetry.failed_telemetry_job \ - import FailedTelemetryJob +from delfin.common.constants import TelemetryCollection + +fake_executor = 'node1' +fake_telemetry_job = { + Task.id.name: 2, + Task.storage_id.name: uuidutils.generate_uuid(), + Task.args.name: {}, + Task.interval.name: 10, + Task.job_id.name: None, + Task.method.name: constants.TelemetryCollection.PERFORMANCE_TASK_METHOD, + Task.last_run_time.name: None, + Task.executor.name: fake_executor, + Task.deleted.name: False, +} + +fake_telemetry_jobs = [ + fake_telemetry_job, +] + +fake_telemetry_job_deleted = { + Task.id.name: 2, + Task.storage_id.name: uuidutils.generate_uuid(), + Task.args.name: {}, + Task.interval.name: 10, + Task.method.name: constants.TelemetryCollection.PERFORMANCE_TASK_METHOD, + Task.last_run_time.name: None, + Task.deleted.name: True, + Task.executor.name: fake_executor, +} + +fake_telemetry_jobs_deleted = [ + fake_telemetry_job_deleted, +] +# With method name as None +Incorrect_telemetry_job = { + Task.id.name: 2, + Task.storage_id.name: uuidutils.generate_uuid(), + Task.args.name: {}, + Task.interval.name: 10, + Task.method.name: None, + Task.last_run_time.name: None, + Task.executor.name: None, +} +Incorrect_telemetry_jobs = [ + Incorrect_telemetry_job, +] fake_failed_job = { FailedTask.id.name: 43, FailedTask.retry_count.name: 0, @@ -42,17 +89,67 @@ FailedTask.end_time.name: int(datetime.now().timestamp()) + 20, FailedTask.interval.name: 20, FailedTask.deleted.name: False, + FailedTask.executor.name: fake_executor, } fake_failed_jobs = [ fake_failed_job, ] -fake_telemetry_job = { - Task.id.name: 2, - Task.storage_id.name: uuidutils.generate_uuid(), - Task.args.name: {}, -} + +class TestTelemetryJob(test.TestCase): + + @mock.patch.object(db, 'task_get_all', + mock.Mock(return_value=fake_telemetry_jobs)) + @mock.patch.object(db, 'task_update', + mock.Mock(return_value=fake_telemetry_job)) + @mock.patch.object(db, 'task_get', + mock.Mock(return_value=fake_telemetry_job)) + @mock.patch( + 'apscheduler.schedulers.background.BackgroundScheduler.add_job') + def test_telemetry_job_scheduling(self, mock_add_job): + ctx = context.get_admin_context() + telemetry_job = JobHandler(ctx, fake_telemetry_job['id'], + fake_telemetry_job['storage_id'], + fake_telemetry_job['args'], + fake_telemetry_job['interval']) + # call telemetry job scheduling + telemetry_job.schedule_job(fake_telemetry_job['id']) + self.assertEqual(mock_add_job.call_count, 1) + + @mock.patch.object(db, 'task_delete', + mock.Mock()) + @mock.patch.object(db, 'task_get_all', + mock.Mock(return_value=fake_telemetry_jobs_deleted)) + @mock.patch.object(db, 'task_update', + mock.Mock(return_value=fake_telemetry_job)) + @mock.patch.object(db, 'task_get', + mock.Mock(return_value=fake_telemetry_job)) + @mock.patch( + 'apscheduler.schedulers.background.BackgroundScheduler.add_job', + mock.Mock()) + @mock.patch('logging.LoggerAdapter.error') + def test_telemetry_removal_success(self, mock_log_error): + ctx = context.get_admin_context() + telemetry_job = JobHandler(ctx, fake_telemetry_job['id'], + fake_telemetry_job['storage_id'], + fake_telemetry_job['args'], + fake_telemetry_job['interval']) + # call telemetry job scheduling + telemetry_job.remove_job(fake_telemetry_job['id']) + self.assertEqual(mock_log_error.call_count, 0) + + @mock.patch.object(db, 'task_get_all', + mock.Mock(return_value=fake_telemetry_jobs)) + @mock.patch.object(db, 'task_update', + mock.Mock(return_value=fake_telemetry_job)) + @mock.patch.object(db, 'task_get', + mock.Mock(return_value=fake_telemetry_job)) + @mock.patch( + 'apscheduler.schedulers.background.BackgroundScheduler.add_job') + def test_schedule_boot_jobs(self, mock_add_job): + JobHandler.schedule_boot_jobs() + self.assertEqual(mock_add_job.call_count, 1) class TestFailedTelemetryJob(test.TestCase): @@ -68,11 +165,13 @@ class TestFailedTelemetryJob(test.TestCase): @mock.patch( 'apscheduler.schedulers.background.BackgroundScheduler.add_job') def test_failed_job_scheduling(self, mock_add_job): - failed_job = FailedTelemetryJob(context.get_admin_context()) + failed_job = FailedJobHandler(context.get_admin_context()) # call failed job scheduling - failed_job() + failed_job.schedule_failed_job(fake_failed_job['id']) self.assertEqual(mock_add_job.call_count, 1) + @mock.patch.object(db, 'failed_task_get', + mock.Mock(return_value=fake_failed_job)) @mock.patch( 'apscheduler.schedulers.background.BackgroundScheduler.remove_job') @mock.patch( @@ -89,15 +188,15 @@ def test_failed_job_with_max_retry(self, mock_failed_get_all, TelemetryCollection.MAX_FAILED_JOB_RETRY_COUNT mock_failed_get_all.return_value = failed_jobs - failed_job = FailedTelemetryJob(context.get_admin_context()) + failed_job = FailedJobHandler(context.get_admin_context()) # call failed job scheduling - failed_job() + failed_job.schedule_failed_job(failed_jobs[0]) mock_get_job.return_value = True # entry get deleted and job get removed - self.assertEqual(mock_failed_task_delete.call_count, 2) - self.assertEqual(mock_remove_job.call_count, 2) + self.assertEqual(mock_failed_task_delete.call_count, 1) + self.assertEqual(mock_remove_job.call_count, 1) @mock.patch( 'apscheduler.schedulers.background.BackgroundScheduler.get_job') @@ -114,13 +213,15 @@ def test_failed_job_with_job_already_scheduled(self, mock_failed_get_all, # configure to have job in scheduler mock_get_job.return_value = failed_jobs - failed_job = FailedTelemetryJob(context.get_admin_context()) + failed_job = FailedJobHandler(context.get_admin_context()) # call failed job scheduling - failed_job() + failed_job.remove_failed_job(fake_failed_job['id']) # the job will not be scheduled self.assertEqual(mock_add_job.call_count, 0) + @mock.patch.object(db, 'failed_task_get', + mock.Mock(return_value=fake_failed_job)) @mock.patch( 'apscheduler.schedulers.background.BackgroundScheduler.remove_job') @mock.patch.object(db, 'failed_task_delete') @@ -133,10 +234,10 @@ def test_failed_job_scheduling_with_no_task(self, mock_failed_get_all, failed_jobs[0][FailedTask.job_id.name] = uuidutils.generate_uuid() mock_failed_get_all.return_value = failed_jobs - failed_job = FailedTelemetryJob(context.get_admin_context()) + failed_job = FailedJobHandler(context.get_admin_context()) # call failed job scheduling - failed_job() + failed_job.remove_failed_job(fake_failed_job) # entry get deleted and job get removed - self.assertEqual(mock_failed_task_delete.call_count, 2) + self.assertEqual(mock_failed_task_delete.call_count, 1) self.assertEqual(mock_remove_job.call_count, 0) diff --git a/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_performance_collection_handler.py b/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_performance_collection_handler.py index a90ed680d..70ee576e2 100644 --- a/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_performance_collection_handler.py +++ b/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_performance_collection_handler.py @@ -28,6 +28,7 @@ PerformanceCollectionHandler fake_task_id = 43 +fake_executor = 'node1' fake_storage_id = '12c2d52f-01bc-41f5-b73f-7abf6f38a2a6' fake_telemetry_job = { Task.id.name: 2, @@ -35,7 +36,8 @@ Task.args.name: {}, Task.interval.name: 10, Task.deleted.name: False, - Task.method.name: constants.TelemetryCollection.PERFORMANCE_TASK_METHOD + Task.method.name: constants.TelemetryCollection.PERFORMANCE_TASK_METHOD, + Task.executor.name: fake_executor } fake_deleted_telemetry_job = { @@ -44,7 +46,8 @@ Task.args.name: {}, Task.interval.name: 10, Task.deleted.name: True, - Task.method.name: constants.TelemetryCollection.PERFORMANCE_TASK_METHOD + Task.method.name: constants.TelemetryCollection.PERFORMANCE_TASK_METHOD, + Task.executor.name: fake_executor } @@ -57,7 +60,8 @@ class TestPerformanceCollectionHandler(test.TestCase): @mock.patch.object(db, 'task_get', mock.Mock(return_value=fake_telemetry_job)) @mock.patch('delfin.db.task_update') - @mock.patch('delfin.task_manager.rpcapi.TaskAPI.collect_telemetry') + @mock.patch('delfin.task_manager.tasks.telemetry' + '.PerformanceCollectionTask.collect') def test_performance_collection_success(self, mock_collect_telemetry, mock_task_update): mock_collect_telemetry.return_value = TelemetryTaskStatus. \ @@ -71,12 +75,17 @@ def test_performance_collection_success(self, mock_collect_telemetry, self.assertEqual(mock_collect_telemetry.call_count, 1) self.assertEqual(mock_task_update.call_count, 1) + @mock.patch('delfin.db.task_update') + @mock.patch('delfin.task_manager.metrics_rpcapi.TaskAPI.assign_failed_job') @mock.patch.object(db, 'task_get', mock.Mock(return_value=fake_telemetry_job)) @mock.patch('delfin.db.failed_task_create') - @mock.patch('delfin.task_manager.rpcapi.TaskAPI.collect_telemetry') + @mock.patch('delfin.task_manager.tasks.telemetry' + '.PerformanceCollectionTask.collect') def test_performance_collection_failure(self, mock_collect_telemetry, - mock_failed_task_create): + mock_failed_task_create, + mock_assign_failed_job, + mock_task_update): mock_collect_telemetry.return_value = TelemetryTaskStatus. \ TASK_EXEC_STATUS_FAILURE ctx = context.get_admin_context() @@ -87,11 +96,14 @@ def test_performance_collection_failure(self, mock_collect_telemetry, # Verify that failed task create is called if collect telemetry fails self.assertEqual(mock_failed_task_create.call_count, 1) + self.assertEqual(mock_assign_failed_job.call_count, 1) + self.assertEqual(mock_task_update.call_count, 1) @mock.patch.object(db, 'task_get', mock.Mock(return_value=fake_deleted_telemetry_job)) @mock.patch('delfin.db.task_update') - @mock.patch('delfin.task_manager.rpcapi.TaskAPI.collect_telemetry') + @mock.patch('delfin.task_manager.tasks.telemetry' + '.PerformanceCollectionTask.collect') def test_performance_collection_deleted_storage(self, mock_collect_telemetry, mock_task_update): @@ -108,14 +120,16 @@ def test_performance_collection_deleted_storage(self, self.assertEqual(mock_task_update.call_count, 0) @mock.patch('delfin.db.task_get', task_not_found_exception) - @mock.patch('delfin.task_manager.rpcapi.TaskAPI.collect_telemetry') + @mock.patch('delfin.task_manager.tasks.telemetry' + '.PerformanceCollectionTask.collect') def test_deleted_storage_exception(self, mock_collect_telemetry): ctx = context.get_admin_context() perf_collection_handler = PerformanceCollectionHandler(ctx, fake_task_id, fake_storage_id, - "", 100) + "", 100, + fake_executor) perf_collection_handler() # Verify that collect telemetry for deleted storage diff --git a/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_telemetry_job.py b/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_telemetry_job.py deleted file mode 100644 index 54e6600e2..000000000 --- a/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_telemetry_job.py +++ /dev/null @@ -1,119 +0,0 @@ -# Copyright 2021 The SODA Authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from unittest import mock - -from oslo_utils import uuidutils - -from delfin import context -from delfin import db -from delfin import test -from delfin.common import constants -from delfin.db.sqlalchemy.models import Task -from delfin.task_manager.scheduler.schedulers.telemetry.telemetry_job import \ - TelemetryJob - -fake_telemetry_job = { - Task.id.name: 2, - Task.storage_id.name: uuidutils.generate_uuid(), - Task.args.name: {}, - Task.interval.name: 10, - Task.method.name: constants.TelemetryCollection.PERFORMANCE_TASK_METHOD, - Task.last_run_time.name: None, -} - -fake_telemetry_jobs = [ - fake_telemetry_job, -] - -fake_telemetry_job_deleted = { - Task.id.name: 2, - Task.storage_id.name: uuidutils.generate_uuid(), - Task.args.name: {}, - Task.interval.name: 10, - Task.method.name: constants.TelemetryCollection.PERFORMANCE_TASK_METHOD, - Task.last_run_time.name: None, - Task.deleted.name: True, -} - -fake_telemetry_jobs_deleted = [ - fake_telemetry_job_deleted, -] -# With method name as None -Incorrect_telemetry_job = { - Task.id.name: 2, - Task.storage_id.name: uuidutils.generate_uuid(), - Task.args.name: {}, - Task.interval.name: 10, - Task.method.name: None, - Task.last_run_time.name: None, -} - -Incorrect_telemetry_jobs = [ - Incorrect_telemetry_job, -] - - -class TestTelemetryJob(test.TestCase): - - @mock.patch.object(db, 'task_get_all', - mock.Mock(return_value=fake_telemetry_jobs)) - @mock.patch.object(db, 'task_update', - mock.Mock(return_value=fake_telemetry_job)) - @mock.patch.object(db, 'task_get', - mock.Mock(return_value=fake_telemetry_job)) - @mock.patch( - 'apscheduler.schedulers.background.BackgroundScheduler.add_job') - def test_telemetry_job_scheduling(self, mock_add_job): - ctx = context.get_admin_context() - telemetry_job = TelemetryJob(ctx) - # call telemetry job scheduling - telemetry_job() - self.assertEqual(mock_add_job.call_count, 1) - - @mock.patch.object(db, 'task_get_all', - mock.Mock(return_value=Incorrect_telemetry_jobs)) - @mock.patch.object(db, 'task_update', - mock.Mock(return_value=Incorrect_telemetry_job)) - @mock.patch.object(db, 'task_get', - mock.Mock(return_value=Incorrect_telemetry_job)) - @mock.patch( - 'apscheduler.schedulers.background.BackgroundScheduler.add_job', - mock.Mock()) - @mock.patch('logging.LoggerAdapter.error') - def test_telemetry_job_scheduling_exception(self, mock_log_error): - ctx = context.get_admin_context() - telemetry_job = TelemetryJob(ctx) - # call telemetry job scheduling - telemetry_job() - self.assertEqual(mock_log_error.call_count, 2) - - @mock.patch.object(db, 'task_delete', - mock.Mock()) - @mock.patch.object(db, 'task_get_all', - mock.Mock(return_value=fake_telemetry_jobs_deleted)) - @mock.patch.object(db, 'task_update', - mock.Mock(return_value=fake_telemetry_job)) - @mock.patch.object(db, 'task_get', - mock.Mock(return_value=fake_telemetry_job)) - @mock.patch( - 'apscheduler.schedulers.background.BackgroundScheduler.add_job', - mock.Mock()) - @mock.patch('logging.LoggerAdapter.error') - def test_telemetry_removal_success(self, mock_log_error): - ctx = context.get_admin_context() - telemetry_job = TelemetryJob(ctx) - # call telemetry job scheduling - telemetry_job() - self.assertEqual(mock_log_error.call_count, 1) diff --git a/delfin/tests/unit/task_manager/scheduler/test_scheduler.py b/delfin/tests/unit/task_manager/scheduler/test_scheduler.py index 0460e67d9..4088b4ee2 100644 --- a/delfin/tests/unit/task_manager/scheduler/test_scheduler.py +++ b/delfin/tests/unit/task_manager/scheduler/test_scheduler.py @@ -12,11 +12,33 @@ # See the License for the specific language governing permissions and # limitations under the License. +from unittest import mock + from apscheduler.schedulers.background import BackgroundScheduler +from delfin import db from delfin import test +from delfin.coordination import ConsistentHashing +from delfin.leader_election.distributor.task_distributor \ + import TaskDistributor +from delfin.task_manager.metrics_rpcapi import TaskAPI from delfin.task_manager.scheduler import schedule_manager +FAKE_TASKS = [ + { + 'id': 1, + 'executor': 'node1' + }, + { + 'id': 2, + 'executor': 'node2' + }, + { + 'id': 3, + 'executor': 'node1' + } +] + class TestScheduler(test.TestCase): @@ -28,3 +50,56 @@ def test_scheduler_manager_singleton(self): self.assertIsInstance(second_instance, BackgroundScheduler) self.assertEqual(first_instance, second_instance) + + @mock.patch.object(BackgroundScheduler, 'start') + def test_start(self, mock_scheduler_start): + manager = schedule_manager.SchedulerManager() + manager.start() + self.assertEqual(mock_scheduler_start.call_count, 1) + manager.start() + self.assertEqual(mock_scheduler_start.call_count, 1) + + @mock.patch('tooz.coordination.get_coordinator', mock.Mock()) + @mock.patch.object(ConsistentHashing, 'get_task_executor') + @mock.patch.object(TaskAPI, 'remove_job') + @mock.patch.object(TaskDistributor, 'distribute_new_job') + @mock.patch.object(db, 'task_get_all') + def test_on_node_join(self, mock_task_get_all, mock_distribute_new_job, + mock_remove_job, mock_get_task_executor): + node1_job_count = 0 + node2_job_count = 0 + for job in FAKE_TASKS: + if job['executor'] == 'node1': + node1_job_count += 1 + elif job['executor'] == 'node2': + node2_job_count += 1 + mock_task_get_all.return_value = FAKE_TASKS + mock_get_task_executor.return_value = 'node1' + manager = schedule_manager.SchedulerManager() + manager.on_node_join(mock.Mock(member_id=b'fake_member_id', + group_id='node1')) + self.assertEqual(mock_task_get_all.call_count, 1) + self.assertEqual(mock_distribute_new_job.call_count, + node1_job_count + node2_job_count) + self.assertEqual(mock_remove_job.call_count, node2_job_count) + self.assertEqual(mock_get_task_executor.call_count, + node1_job_count + node2_job_count) + + @mock.patch.object(TaskDistributor, 'distribute_new_job') + @mock.patch.object(db, 'task_get_all') + def test_on_node_leave(self, mock_task_get_all, mock_distribute_new_job): + mock_task_get_all.return_value = FAKE_TASKS + manager = schedule_manager.SchedulerManager() + manager.on_node_leave(mock.Mock(member_id=b'fake_member_id', + group_id='fake_group_id')) + self.assertEqual(mock_task_get_all.call_count, 1) + self.assertEqual(mock_distribute_new_job.call_count, len(FAKE_TASKS)) + + @mock.patch.object(TaskDistributor, 'distribute_new_job') + @mock.patch.object(db, 'task_get_all') + def test_recover_job(self, mock_task_get_all, mock_distribute_new_job): + mock_task_get_all.return_value = FAKE_TASKS + manager = schedule_manager.SchedulerManager() + manager.recover_job() + self.assertEqual(mock_task_get_all.call_count, 1) + self.assertEqual(mock_distribute_new_job.call_count, len(FAKE_TASKS)) diff --git a/delfin/tests/unit/task_manager/test_telemetry.py b/delfin/tests/unit/task_manager/test_telemetry.py index 3450f80ff..f392f422f 100644 --- a/delfin/tests/unit/task_manager/test_telemetry.py +++ b/delfin/tests/unit/task_manager/test_telemetry.py @@ -70,16 +70,3 @@ def test_performance_collection_failure(self, mock_collect_perf_metrics, # when collect metric fails self.assertEqual(mock_dispatch.call_count, 0) self.assertEqual(mock_log_error.call_count, 1) - - @mock.patch('delfin.db.failed_task_delete_by_storage') - @mock.patch('delfin.db.task_delete_by_storage') - def test_successful_remove(self, mock_task_del, mock_failed_task_del): - telemetry_obj = telemetry.PerformanceCollectionTask( - ) - telemetry_obj.remove_telemetry( - context, 'c5c91c98-91aa-40e6-85ac-37a1d3b32bda') - - mock_task_del.assert_called_with( - context, 'c5c91c98-91aa-40e6-85ac-37a1d3b32bda') - mock_failed_task_del.assert_called_with( - context, 'c5c91c98-91aa-40e6-85ac-37a1d3b32bda') diff --git a/delfin/tests/unit/test_coordination.py b/delfin/tests/unit/test_coordination.py index 9f37896bf..6ae27cdce 100644 --- a/delfin/tests/unit/test_coordination.py +++ b/delfin/tests/unit/test_coordination.py @@ -118,3 +118,33 @@ def func(foo, bar): bar.__getitem__.return_value = 8 func(foo, bar) get_lock.assert_called_with('lock-func-7-8') + + +class ConsistentHashingTestCase(test.TestCase): + + def setUp(self): + super(ConsistentHashingTestCase, self).setUp() + self.get_coordinator = self.mock_object(tooz_coordination, + 'get_coordinator') + + def test_join_group(self): + crd = self.get_coordinator.return_value + part = coordination.ConsistentHashing() + part.start() + part.join_group() + self.assertTrue(crd.join_partitioned_group.called) + + def test_register_watcher_func(self): + crd = self.get_coordinator.return_value + part = coordination.ConsistentHashing() + part.start() + part.register_watcher_func(mock.Mock(), mock.Mock()) + self.assertTrue(crd.watch_join_group.called) + self.assertTrue(crd.watch_leave_group.called) + + def test_watch_group_change(self): + crd = self.get_coordinator.return_value + part = coordination.ConsistentHashing() + part.start() + part.watch_group_change() + self.assertTrue(crd.run_watchers.called) diff --git a/requirements.txt b/requirements.txt index 167f80b36..614956834 100644 --- a/requirements.txt +++ b/requirements.txt @@ -40,3 +40,5 @@ APScheduler~=3.6.3 flask kafka-python importlib-metadata==3.7.0; python_version < "3.8" +tenacity==6.3.1 +tzlocal<3.0