Skip to content

Commit

Permalink
Syncing perf_coll_fw_enhance feature branch to master (#692)
Browse files Browse the repository at this point in the history
* Make job scheduler local to task process (#674)

* Make job scheduler local to task process

* Notify distributor when a new task added (#678)

* Remove db-scan for new task creation (#680)

* Use consistent hash to manage the topic (#681)

* Remove the periodically call from task distributor (#686)

* Start one historic collection immediate when a job is rescheduled (#685)

* Start one historic collection immediate when a job is rescheduled

* Remove failed task distributor (#687)

* Improving Failed job handling and telemetry job removal (#689)

Co-authored-by: ThisIsClark <liuyuchibubao@gmail.com>
Co-authored-by: Ashit Kumar <akopensrc@gmail.com>
  • Loading branch information
3 people authored Sep 14, 2021
1 parent cdbaf38 commit e73f251
Show file tree
Hide file tree
Showing 33 changed files with 1,095 additions and 566 deletions.
28 changes: 4 additions & 24 deletions delfin/api/v1/storages.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
from delfin.common import constants
from delfin.drivers import api as driverapi
from delfin.i18n import _
from delfin.task_manager import perf_job_controller
from delfin.task_manager import rpcapi as task_rpcapi
from delfin.task_manager.tasks import resources
from delfin.task_manager.tasks import telemetry as task_telemetry

LOG = log.getLogger(__name__)
CONF = cfg.CONF
Expand Down Expand Up @@ -109,7 +109,7 @@ def create(self, req, body):
capabilities = self.driver_api.get_capabilities(
context=ctxt, storage_id=storage['id'])
validation.validate_capabilities(capabilities)
_create_performance_monitoring_task(ctxt, storage['id'],
perf_job_controller.create_perf_job(ctxt, storage['id'],
capabilities)
except exception.EmptyResourceMetrics:
msg = _("Resource metric provided by capabilities is empty for "
Expand All @@ -134,13 +134,8 @@ def delete(self, req, id):
storage['id'],
subclass.__module__ + '.' + subclass.__name__)

for subclass in task_telemetry.TelemetryTask.__subclasses__():
self.task_rpcapi.remove_telemetry_instances(ctxt,
storage['id'],
subclass.__module__ +
'.'
+ subclass.__name__)
self.task_rpcapi.remove_storage_in_cache(ctxt, storage['id'])
perf_job_controller.delete_perf_job(ctxt, storage['id'])

@wsgi.response(202)
def sync_all(self, req):
Expand Down Expand Up @@ -230,7 +225,7 @@ def get_capabilities(self, req, id):
storage_info = db.storage_get(ctx, id)

# Fetch supported driver's capability
capabilities = self.driver_api.\
capabilities = self.driver_api. \
get_capabilities(ctx, storage_info['id'])

# validate capabilities
Expand Down Expand Up @@ -265,18 +260,3 @@ def _set_synced_if_ok(context, storage_id, resource_count):
storage['sync_status'] = resource_count * constants.ResourceSync.START
storage['updated_at'] = current_time
db.storage_update(context, storage['id'], storage)


def _create_performance_monitoring_task(context, storage_id, capabilities):
# Check resource_metric attribute availability and
# check if resource_metric is empty
if 'resource_metrics' not in capabilities \
or not bool(capabilities.get('resource_metrics')):
raise exception.EmptyResourceMetrics()

task = dict()
task.update(storage_id=storage_id)
task.update(args=capabilities.get('resource_metrics'))
task.update(interval=CONF.telemetry.performance_collection_interval)
task.update(method=constants.TelemetryCollection.PERFORMANCE_TASK_METHOD)
db.task_create(context=context, values=task)
10 changes: 10 additions & 0 deletions delfin/cmd/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"""Starter script for delfin task service."""

import eventlet

eventlet.monkey_patch()

import sys
Expand All @@ -45,9 +46,18 @@ def main():
task_server = service.TaskService.create(binary='delfin-task',
coordination=True)
leader_election = service.LeaderElectionService.create()
metrics_task_server = service. \
TaskService.create(binary='delfin-task',
topic=CONF.host,
manager='delfin.'
'task_manager.'
'metrics_manager.'
'MetricsTaskManager',
coordination=True)

service.serve(task_server)
service.serve(leader_election)
service.serve(metrics_task_server)

service.wait()

Expand Down
5 changes: 5 additions & 0 deletions delfin/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@
default=constants.TelemetryCollection
.DEF_PERFORMANCE_COLLECTION_INTERVAL,
help='default interval (in sec) for performance collection'),
cfg.IntOpt('performance_history_on_reschedule',
default=constants.TelemetryCollection
.DEF_PERFORMANCE_HISTORY_ON_RESCHEDULE,
help='default history(in sec) to be collected on a job '
'reschedule'),
]

CONF.register_opts(telemetry_opts, "telemetry")
Expand Down
1 change: 1 addition & 0 deletions delfin/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ class TelemetryCollection(object):
MAX_FAILED_JOB_RETRY_COUNT = 5
"""Default performance collection interval"""
DEF_PERFORMANCE_COLLECTION_INTERVAL = 900
DEF_PERFORMANCE_HISTORY_ON_RESCHEDULE = 300


class TelemetryTaskStatus(object):
Expand Down
30 changes: 30 additions & 0 deletions delfin/coordination.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
import inspect

import decorator
import tooz
from oslo_config import cfg
from oslo_log import log
from oslo_utils import uuidutils
import six
from tooz import coordination
from tooz import locking
from tooz import partitioner

from delfin import cryptor
from delfin import exception
Expand Down Expand Up @@ -320,3 +322,31 @@ def _get_redis_backend_url():
.format(backend_type=CONF.coordination.backend_type,
server=CONF.coordination.backend_server)
return backend_url


class ConsistentHashing(Coordinator):
GROUP_NAME = 'partitioner_group'

def __init__(self):
super(ConsistentHashing, self). \
__init__(agent_id=CONF.host, prefix="")

def join_group(self):
try:
self.coordinator.join_partitioned_group(self.GROUP_NAME)
except tooz.coordination.MemberAlreadyExist:
LOG.info('Member %s already in partitioner_group' % CONF.host)

def get_task_executor(self, task_id):
part = partitioner.Partitioner(self.coordinator, self.GROUP_NAME)
members = part.members_for_object(task_id)
for member in members:
LOG.info('For task id %s, host should be %s' % (task_id, member))
return member.decode('utf-8')

def register_watcher_func(self, on_node_join, on_node_leave):
self.coordinator.watch_join_group(self.GROUP_NAME, on_node_join)
self.coordinator.watch_leave_group(self.GROUP_NAME, on_node_leave)

def watch_group_change(self):
self.coordinator.run_watchers()
2 changes: 2 additions & 0 deletions delfin/db/sqlalchemy/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ class Task(BASE, DelfinBase):
args = Column(JsonEncodedDict)
last_run_time = Column(Integer)
job_id = Column(String(36))
executor = Column(String(255))
deleted_at = Column(DateTime)
deleted = Column(Boolean, default=False)

Expand All @@ -285,6 +286,7 @@ class FailedTask(BASE, DelfinBase):
method = Column(String(255))
result = Column(String(255))
job_id = Column(String(36))
executor = Column(String(255))
deleted_at = Column(DateTime)
deleted = Column(Boolean, default=False)

Expand Down
Empty file.
32 changes: 32 additions & 0 deletions delfin/leader_election/distributor/perf_job_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright 2021 The SODA Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log

from delfin import manager
from delfin.leader_election.distributor import task_distributor

LOG = log.getLogger(__name__)


class PerfJobManager(manager.Manager):
"""Generate job to job distributor"""

RPC_API_VERSION = '1.0'

def __init__(self, service_name=None, *args, **kwargs):
super(PerfJobManager, self).__init__(*args, **kwargs)

def add_new_job(self, context, task_id):
distributor = task_distributor.TaskDistributor(context)
distributor.distribute_new_job(task_id)
62 changes: 62 additions & 0 deletions delfin/leader_election/distributor/task_distributor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Copyright 2021 The SODA Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import six
from oslo_config import cfg
from oslo_log import log

from delfin import db
from delfin.common.constants import TelemetryCollection
from delfin.coordination import ConsistentHashing
from delfin.task_manager import metrics_rpcapi as task_rpcapi

CONF = cfg.CONF
LOG = log.getLogger(__name__)


class TaskDistributor(object):
def __init__(self, ctx):
self.ctx = ctx
self.task_rpcapi = task_rpcapi.TaskAPI()

def distribute_new_job(self, task_id):
partitioner = ConsistentHashing()
partitioner.start()
executor = partitioner.get_task_executor(task_id)
try:
db.task_update(self.ctx, task_id, {'executor': executor})
LOG.info('Distribute a new job, id: %s' % task_id)
self.task_rpcapi.assign_job(self.ctx, task_id, executor)
except Exception as e:
LOG.error('Failed to distribute the new job, reason: %s',
six.text_type(e))
raise e

def distribute_failed_job(self, failed_task_id, executor):

try:
db.failed_task_update(self.ctx, failed_task_id,
{'executor': executor})
LOG.info('Distribute a failed job, id: %s' % failed_task_id)
self.task_rpcapi.assign_failed_job(self.ctx, failed_task_id,
executor)
except Exception as e:
LOG.error('Failed to distribute failed job, reason: %s',
six.text_type(e))
raise e

@classmethod
def job_interval(cls):
return TelemetryCollection.PERIODIC_JOB_INTERVAL
3 changes: 2 additions & 1 deletion delfin/leader_election/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ def construct_elector(plugin, leader_key=None):
scheduler_mgr = SchedulerManager()

if plugin == "tooz":
scheduler_mgr.start()
# Create callback object
callback = ToozLeaderElectionCallback.register(
on_leading_callback=scheduler_mgr.start,
on_leading_callback=scheduler_mgr.schedule_boot_jobs,
on_stop_callback=scheduler_mgr.stop)

return Elector(callback, leader_election_key)
Expand Down
17 changes: 0 additions & 17 deletions delfin/task_manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,6 @@ def sync_storage_resource(self, context, storage_id, resource_task):
device_obj = cls(context, storage_id)
device_obj.sync()

def collect_telemetry(self, context, storage_id, telemetry_task,
args, start_time, end_time):
LOG.debug("Collecting resource metrics: {0} request for storage"
" id:{1}".format(args, storage_id))
cls = importutils.import_class(telemetry_task)
device_obj = cls()
return device_obj.collect(context, storage_id, args, start_time,
end_time)

def remove_storage_resource(self, context, storage_id, resource_task):
cls = importutils.import_class(resource_task)
device_obj = cls(context, storage_id)
Expand All @@ -63,14 +54,6 @@ def remove_storage_in_cache(self, context, storage_id):
drivers = driver_manager.DriverManager()
drivers.remove_driver(storage_id)

def remove_telemetry_instances(self, context, storage_id, telemetry_task):
LOG.info('Remove telemetry instances for storage id:{0}')
cls = importutils.import_class(telemetry_task)
device_obj = cls()
return device_obj.remove_telemetry(context,
storage_id,
)

def sync_storage_alerts(self, context, storage_id, query_para):
LOG.info('Alert sync called for storage id:{0}'
.format(storage_id))
Expand Down
60 changes: 60 additions & 0 deletions delfin/task_manager/metrics_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright 2021 The SODA Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
periodical task manager for metric collection tasks**
"""
from oslo_log import log

from delfin import manager
from delfin.coordination import ConsistentHashing
from delfin.task_manager.scheduler import schedule_manager
from delfin.task_manager.scheduler.schedulers.telemetry.job_handler \
import FailedJobHandler
from delfin.task_manager.scheduler.schedulers.telemetry.job_handler \
import JobHandler
from delfin.task_manager.tasks import telemetry

LOG = log.getLogger(__name__)


class MetricsTaskManager(manager.Manager):
"""manage periodical tasks"""

RPC_API_VERSION = '1.0'

def __init__(self, service_name=None, *args, **kwargs):
self.telemetry_task = telemetry.TelemetryTask()
super(MetricsTaskManager, self).__init__(*args, **kwargs)
scheduler = schedule_manager.SchedulerManager()
scheduler.start()
JobHandler.schedule_boot_jobs()
partitioner = ConsistentHashing()
partitioner.start()
partitioner.join_group()

def assign_job(self, context, task_id):
instance = JobHandler.get_instance(context, task_id)
instance.schedule_job(task_id)

def remove_job(self, context, task_id):
instance = JobHandler.get_instance(context, task_id)
instance.remove_job(task_id)

def assign_failed_job(self, context, failed_task_id):
instance = FailedJobHandler.get_instance(context, failed_task_id)
instance.schedule_failed_job(failed_task_id)

def remove_failed_job(self, context, failed_task_id):
instance = FailedJobHandler.get_instance(context, failed_task_id)
instance.remove_failed_job(failed_task_id)
Loading

0 comments on commit e73f251

Please sign in to comment.