Skip to content

Commit

Permalink
Notify distributor when a new task added (#678)
Browse files Browse the repository at this point in the history
  • Loading branch information
ThisIsClark authored Aug 31, 2021
1 parent 903138a commit d2b4140
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 20 deletions.
20 changes: 3 additions & 17 deletions delfin/api/v1/storages.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from delfin.common import constants
from delfin.drivers import api as driverapi
from delfin.i18n import _
from delfin.task_manager import perf_job_controller
from delfin.task_manager import rpcapi as task_rpcapi
from delfin.task_manager.tasks import resources
from delfin.task_manager.tasks import telemetry as task_telemetry
Expand Down Expand Up @@ -109,7 +110,7 @@ def create(self, req, body):
capabilities = self.driver_api.get_capabilities(
context=ctxt, storage_id=storage['id'])
validation.validate_capabilities(capabilities)
_create_performance_monitoring_task(ctxt, storage['id'],
perf_job_controller.create_perf_job(ctxt, storage['id'],
capabilities)
except exception.EmptyResourceMetrics:
msg = _("Resource metric provided by capabilities is empty for "
Expand Down Expand Up @@ -230,7 +231,7 @@ def get_capabilities(self, req, id):
storage_info = db.storage_get(ctx, id)

# Fetch supported driver's capability
capabilities = self.driver_api.\
capabilities = self.driver_api. \
get_capabilities(ctx, storage_info['id'])

# validate capabilities
Expand Down Expand Up @@ -265,18 +266,3 @@ def _set_synced_if_ok(context, storage_id, resource_count):
storage['sync_status'] = resource_count * constants.ResourceSync.START
storage['updated_at'] = current_time
db.storage_update(context, storage['id'], storage)


def _create_performance_monitoring_task(context, storage_id, capabilities):
# Check resource_metric attribute availability and
# check if resource_metric is empty
if 'resource_metrics' not in capabilities \
or not bool(capabilities.get('resource_metrics')):
raise exception.EmptyResourceMetrics()

task = dict()
task.update(storage_id=storage_id)
task.update(args=capabilities.get('resource_metrics'))
task.update(interval=CONF.telemetry.performance_collection_interval)
task.update(method=constants.TelemetryCollection.PERFORMANCE_TASK_METHOD)
db.task_create(context=context, values=task)
32 changes: 32 additions & 0 deletions delfin/leader_election/distributor/perf_job_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright 2021 The SODA Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log

from delfin import manager
from delfin.leader_election.distributor import task_distributor

LOG = log.getLogger(__name__)


class PerfJobManager(manager.Manager):
"""Generate job to job distributor"""

RPC_API_VERSION = '1.0'

def __init__(self, service_name=None, *args, **kwargs):
super(PerfJobManager, self).__init__(*args, **kwargs)

def add_new_job(self, context, task_id):
distributor = task_distributor.TaskDistributor(context)
distributor.distribute_new_job(task_id)
11 changes: 11 additions & 0 deletions delfin/leader_election/distributor/task_distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,17 @@ def __call__(self):
else:
LOG.debug("Periodic job distribution completed.")

def distribute_new_job(self, task_id):
executor = CONF.host
try:
db.task_update(self.ctx, task_id, {'executor': executor})
LOG.info('Distribute a new job, id: %s' % task_id)
self.task_rpcapi.assign_job(self.ctx, task_id, executor)
except Exception as e:
LOG.error('Failed to distribute the new job, reason: %s',
six.text_type(e))
raise e

@classmethod
def job_interval(cls):
return TelemetryCollection.PERIODIC_JOB_INTERVAL
7 changes: 4 additions & 3 deletions delfin/task_manager/metrics_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
periodical task manager for metric collection tasks**
"""
from oslo_log import log

from delfin import manager
from delfin.task_manager.scheduler import schedule_manager
from delfin.task_manager.scheduler.schedulers.telemetry.job_handler\
import JobHandler
from delfin.task_manager.scheduler.schedulers.telemetry.job_handler\
from delfin.task_manager.scheduler.schedulers.telemetry.job_handler \
import FailedJobHandler
from delfin.task_manager.scheduler.schedulers.telemetry.job_handler \
import JobHandler
from delfin.task_manager.tasks import telemetry

LOG = log.getLogger(__name__)
Expand Down
6 changes: 6 additions & 0 deletions delfin/task_manager/metrics_rpcapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,9 @@ def remove_failed_job(self, context, failed_task_id, executor):
fanout=True)
return call_context.cast(context, 'remove_failed_job',
failed_task_id=failed_task_id)

def create_perf_job(self, context, task_id):
rpc_client = self.get_client('JobGenerator')
call_context = rpc_client.prepare(topic='JobGenerator', version='1.0')
return call_context.cast(context, 'add_new_job',
task_id=task_id)
49 changes: 49 additions & 0 deletions delfin/task_manager/perf_job_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright 2021 The SODA Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Client side of the metrics task manager RPC API.
"""

from oslo_config import cfg
from oslo_log import log

from delfin import db
from delfin import exception
from delfin.common import constants
from delfin.task_manager import metrics_rpcapi

LOG = log.getLogger(__name__)
CONF = cfg.CONF


def create_perf_job(context, storage_id, capabilities):
# Add it to db
# Check resource_metric attribute availability and
# check if resource_metric is empty
if 'resource_metrics' not in capabilities \
or not bool(capabilities.get('resource_metrics')):
raise exception.EmptyResourceMetrics()

task = dict()
task.update(storage_id=storage_id)
task.update(args=capabilities.get('resource_metrics'))
task.update(interval=CONF.telemetry.performance_collection_interval)
task.update(
method=constants.TelemetryCollection.PERFORMANCE_TASK_METHOD)
db.task_create(context=context, values=task)
# Add it to RabbitMQ
filters = {'storage_id': storage_id}
task_id = db.task_get_all(context, filters=filters)[0].get('id')
metrics_rpcapi.TaskAPI().create_perf_job(context, task_id)
11 changes: 11 additions & 0 deletions delfin/task_manager/scheduler/schedule_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from oslo_utils import uuidutils

from delfin import context
from delfin import service
from delfin import utils
from delfin.leader_election.distributor.failed_task_distributor\
import FailedTaskDistributor
Expand Down Expand Up @@ -76,6 +77,16 @@ def schedule_boot_jobs(self):
LOG.error("Failed to initialize periodic tasks, reason: %s.",
six.text_type(e))
raise e
job_generator = service. \
TaskService.create(binary='delfin-task',
topic='JobGenerator',
manager='delfin.'
'leader_election.'
'distributor.'
'perf_job_manager.'
'PerfJobManager',
coordination=True)
service.serve(job_generator)

def stop(self):
"""Cleanup periodic jobs"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,13 @@ def test_telemetry_job_scheduling(self, mock_assign_job):
# call telemetry job scheduling
task_distributor()
self.assertEqual(mock_assign_job.call_count, 1)

@mock.patch.object(db, 'task_update')
@mock.patch(
'delfin.task_manager.metrics_rpcapi.TaskAPI.assign_job')
def test_distribute_new_job(self, mock_task_update, mock_assign_job):
ctx = context.get_admin_context()
task_distributor = TaskDistributor(ctx)
task_distributor.distribute_new_job('fake_task_id')
self.assertEqual(mock_assign_job.call_count, 1)
self.assertEqual(mock_task_update.call_count, 1)

0 comments on commit d2b4140

Please sign in to comment.