From d2b4140207d916121b81004d081af5247a1e5907 Mon Sep 17 00:00:00 2001 From: ThisIsClark Date: Tue, 31 Aug 2021 10:06:24 +0800 Subject: [PATCH] Notify distributor when a new task added (#678) --- delfin/api/v1/storages.py | 20 ++------ .../distributor/perf_job_manager.py | 32 ++++++++++++ .../distributor/task_distributor.py | 11 +++++ delfin/task_manager/metrics_manager.py | 7 +-- delfin/task_manager/metrics_rpcapi.py | 6 +++ delfin/task_manager/perf_job_controller.py | 49 +++++++++++++++++++ .../scheduler/schedule_manager.py | 11 +++++ .../distributor/test_task_distributor.py | 10 ++++ 8 files changed, 126 insertions(+), 20 deletions(-) create mode 100644 delfin/leader_election/distributor/perf_job_manager.py create mode 100644 delfin/task_manager/perf_job_controller.py diff --git a/delfin/api/v1/storages.py b/delfin/api/v1/storages.py index 24acbdf4b..80061efdf 100755 --- a/delfin/api/v1/storages.py +++ b/delfin/api/v1/storages.py @@ -30,6 +30,7 @@ from delfin.common import constants from delfin.drivers import api as driverapi from delfin.i18n import _ +from delfin.task_manager import perf_job_controller from delfin.task_manager import rpcapi as task_rpcapi from delfin.task_manager.tasks import resources from delfin.task_manager.tasks import telemetry as task_telemetry @@ -109,7 +110,7 @@ def create(self, req, body): capabilities = self.driver_api.get_capabilities( context=ctxt, storage_id=storage['id']) validation.validate_capabilities(capabilities) - _create_performance_monitoring_task(ctxt, storage['id'], + perf_job_controller.create_perf_job(ctxt, storage['id'], capabilities) except exception.EmptyResourceMetrics: msg = _("Resource metric provided by capabilities is empty for " @@ -230,7 +231,7 @@ def get_capabilities(self, req, id): storage_info = db.storage_get(ctx, id) # Fetch supported driver's capability - capabilities = self.driver_api.\ + capabilities = self.driver_api. \ get_capabilities(ctx, storage_info['id']) # validate capabilities @@ -265,18 +266,3 @@ def _set_synced_if_ok(context, storage_id, resource_count): storage['sync_status'] = resource_count * constants.ResourceSync.START storage['updated_at'] = current_time db.storage_update(context, storage['id'], storage) - - -def _create_performance_monitoring_task(context, storage_id, capabilities): - # Check resource_metric attribute availability and - # check if resource_metric is empty - if 'resource_metrics' not in capabilities \ - or not bool(capabilities.get('resource_metrics')): - raise exception.EmptyResourceMetrics() - - task = dict() - task.update(storage_id=storage_id) - task.update(args=capabilities.get('resource_metrics')) - task.update(interval=CONF.telemetry.performance_collection_interval) - task.update(method=constants.TelemetryCollection.PERFORMANCE_TASK_METHOD) - db.task_create(context=context, values=task) diff --git a/delfin/leader_election/distributor/perf_job_manager.py b/delfin/leader_election/distributor/perf_job_manager.py new file mode 100644 index 000000000..8acc8ae69 --- /dev/null +++ b/delfin/leader_election/distributor/perf_job_manager.py @@ -0,0 +1,32 @@ +# Copyright 2021 The SODA Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from oslo_log import log + +from delfin import manager +from delfin.leader_election.distributor import task_distributor + +LOG = log.getLogger(__name__) + + +class PerfJobManager(manager.Manager): + """Generate job to job distributor""" + + RPC_API_VERSION = '1.0' + + def __init__(self, service_name=None, *args, **kwargs): + super(PerfJobManager, self).__init__(*args, **kwargs) + + def add_new_job(self, context, task_id): + distributor = task_distributor.TaskDistributor(context) + distributor.distribute_new_job(task_id) diff --git a/delfin/leader_election/distributor/task_distributor.py b/delfin/leader_election/distributor/task_distributor.py index 5b48ea4dc..6ba7dcaf2 100644 --- a/delfin/leader_election/distributor/task_distributor.py +++ b/delfin/leader_election/distributor/task_distributor.py @@ -74,6 +74,17 @@ def __call__(self): else: LOG.debug("Periodic job distribution completed.") + def distribute_new_job(self, task_id): + executor = CONF.host + try: + db.task_update(self.ctx, task_id, {'executor': executor}) + LOG.info('Distribute a new job, id: %s' % task_id) + self.task_rpcapi.assign_job(self.ctx, task_id, executor) + except Exception as e: + LOG.error('Failed to distribute the new job, reason: %s', + six.text_type(e)) + raise e + @classmethod def job_interval(cls): return TelemetryCollection.PERIODIC_JOB_INTERVAL diff --git a/delfin/task_manager/metrics_manager.py b/delfin/task_manager/metrics_manager.py index ffaaa6e06..746966408 100644 --- a/delfin/task_manager/metrics_manager.py +++ b/delfin/task_manager/metrics_manager.py @@ -15,12 +15,13 @@ periodical task manager for metric collection tasks** """ from oslo_log import log + from delfin import manager from delfin.task_manager.scheduler import schedule_manager -from delfin.task_manager.scheduler.schedulers.telemetry.job_handler\ - import JobHandler -from delfin.task_manager.scheduler.schedulers.telemetry.job_handler\ +from delfin.task_manager.scheduler.schedulers.telemetry.job_handler \ import FailedJobHandler +from delfin.task_manager.scheduler.schedulers.telemetry.job_handler \ + import JobHandler from delfin.task_manager.tasks import telemetry LOG = log.getLogger(__name__) diff --git a/delfin/task_manager/metrics_rpcapi.py b/delfin/task_manager/metrics_rpcapi.py index 9019a2ef1..ea2b96b02 100644 --- a/delfin/task_manager/metrics_rpcapi.py +++ b/delfin/task_manager/metrics_rpcapi.py @@ -73,3 +73,9 @@ def remove_failed_job(self, context, failed_task_id, executor): fanout=True) return call_context.cast(context, 'remove_failed_job', failed_task_id=failed_task_id) + + def create_perf_job(self, context, task_id): + rpc_client = self.get_client('JobGenerator') + call_context = rpc_client.prepare(topic='JobGenerator', version='1.0') + return call_context.cast(context, 'add_new_job', + task_id=task_id) diff --git a/delfin/task_manager/perf_job_controller.py b/delfin/task_manager/perf_job_controller.py new file mode 100644 index 000000000..8b6c173d2 --- /dev/null +++ b/delfin/task_manager/perf_job_controller.py @@ -0,0 +1,49 @@ +# Copyright 2021 The SODA Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Client side of the metrics task manager RPC API. +""" + +from oslo_config import cfg +from oslo_log import log + +from delfin import db +from delfin import exception +from delfin.common import constants +from delfin.task_manager import metrics_rpcapi + +LOG = log.getLogger(__name__) +CONF = cfg.CONF + + +def create_perf_job(context, storage_id, capabilities): + # Add it to db + # Check resource_metric attribute availability and + # check if resource_metric is empty + if 'resource_metrics' not in capabilities \ + or not bool(capabilities.get('resource_metrics')): + raise exception.EmptyResourceMetrics() + + task = dict() + task.update(storage_id=storage_id) + task.update(args=capabilities.get('resource_metrics')) + task.update(interval=CONF.telemetry.performance_collection_interval) + task.update( + method=constants.TelemetryCollection.PERFORMANCE_TASK_METHOD) + db.task_create(context=context, values=task) + # Add it to RabbitMQ + filters = {'storage_id': storage_id} + task_id = db.task_get_all(context, filters=filters)[0].get('id') + metrics_rpcapi.TaskAPI().create_perf_job(context, task_id) diff --git a/delfin/task_manager/scheduler/schedule_manager.py b/delfin/task_manager/scheduler/schedule_manager.py index 6d2a2c37e..0015e941b 100644 --- a/delfin/task_manager/scheduler/schedule_manager.py +++ b/delfin/task_manager/scheduler/schedule_manager.py @@ -21,6 +21,7 @@ from oslo_utils import uuidutils from delfin import context +from delfin import service from delfin import utils from delfin.leader_election.distributor.failed_task_distributor\ import FailedTaskDistributor @@ -76,6 +77,16 @@ def schedule_boot_jobs(self): LOG.error("Failed to initialize periodic tasks, reason: %s.", six.text_type(e)) raise e + job_generator = service. \ + TaskService.create(binary='delfin-task', + topic='JobGenerator', + manager='delfin.' + 'leader_election.' + 'distributor.' + 'perf_job_manager.' + 'PerfJobManager', + coordination=True) + service.serve(job_generator) def stop(self): """Cleanup periodic jobs""" diff --git a/delfin/tests/unit/leader_election/distributor/test_task_distributor.py b/delfin/tests/unit/leader_election/distributor/test_task_distributor.py index 206c0277d..50cb62efc 100644 --- a/delfin/tests/unit/leader_election/distributor/test_task_distributor.py +++ b/delfin/tests/unit/leader_election/distributor/test_task_distributor.py @@ -54,3 +54,13 @@ def test_telemetry_job_scheduling(self, mock_assign_job): # call telemetry job scheduling task_distributor() self.assertEqual(mock_assign_job.call_count, 1) + + @mock.patch.object(db, 'task_update') + @mock.patch( + 'delfin.task_manager.metrics_rpcapi.TaskAPI.assign_job') + def test_distribute_new_job(self, mock_task_update, mock_assign_job): + ctx = context.get_admin_context() + task_distributor = TaskDistributor(ctx) + task_distributor.distribute_new_job('fake_task_id') + self.assertEqual(mock_assign_job.call_count, 1) + self.assertEqual(mock_task_update.call_count, 1)