diff --git a/delfin/api/v1/storages.py b/delfin/api/v1/storages.py index 80061efdf..3ff34b85b 100755 --- a/delfin/api/v1/storages.py +++ b/delfin/api/v1/storages.py @@ -142,6 +142,7 @@ def delete(self, req, id): '.' + subclass.__name__) self.task_rpcapi.remove_storage_in_cache(ctxt, storage['id']) + perf_job_controller.delete_perf_job(ctxt, storage['id']) @wsgi.response(202) def sync_all(self, req): diff --git a/delfin/leader_election/distributor/task_distributor.py b/delfin/leader_election/distributor/task_distributor.py index b6d6f02cb..7ce43192d 100644 --- a/delfin/leader_election/distributor/task_distributor.py +++ b/delfin/leader_election/distributor/task_distributor.py @@ -31,22 +31,6 @@ def __init__(self, ctx): self.ctx = ctx self.task_rpcapi = task_rpcapi.TaskAPI() - def __call__(self): - """ Schedule the collection tasks based on interval """ - - try: - # Remove jobs from scheduler when marked for delete - filters = {'deleted': True} - tasks = db.task_get_all(self.ctx, filters=filters) - LOG.debug("Total tasks found deleted " - "in this cycle:%s" % len(tasks)) - for task in tasks: - self.task_rpcapi.remove_job(self.ctx, task['id'], - task['executor']) - except Exception as e: - LOG.error("Failed to remove periodic scheduling job , reason: %s.", - six.text_type(e)) - def distribute_new_job(self, task_id): partitioner = ConsistentHashing() partitioner.start() diff --git a/delfin/task_manager/perf_job_controller.py b/delfin/task_manager/perf_job_controller.py index 8b6c173d2..7dac46667 100644 --- a/delfin/task_manager/perf_job_controller.py +++ b/delfin/task_manager/perf_job_controller.py @@ -47,3 +47,14 @@ def create_perf_job(context, storage_id, capabilities): filters = {'storage_id': storage_id} task_id = db.task_get_all(context, filters=filters)[0].get('id') metrics_rpcapi.TaskAPI().create_perf_job(context, task_id) + + +def delete_perf_job(context, storage_id): + # Delete it from scheduler + filters = {'storage_id': storage_id} + tasks = db.task_get_all(context, filters=filters) + for task in tasks: + metrics_rpcapi.TaskAPI().remove_job(context, task.get('id'), + task.get('executor')) + # Delete it from db + db.task_delete_by_storage(context, storage_id) diff --git a/delfin/task_manager/scheduler/schedule_manager.py b/delfin/task_manager/scheduler/schedule_manager.py index fc19e6822..766398c65 100644 --- a/delfin/task_manager/scheduler/schedule_manager.py +++ b/delfin/task_manager/scheduler/schedule_manager.py @@ -34,7 +34,6 @@ LOG = log.getLogger(__name__) SCHEDULER_BOOT_JOBS = [ - TaskDistributor.__module__ + '.' + TaskDistributor.__name__, FailedTaskDistributor.__module__ + '.' + FailedTaskDistributor.__name__ ]