Skip to content

Commit

Permalink
Remove the periodically call from task distributor (#686)
Browse files Browse the repository at this point in the history
  • Loading branch information
ThisIsClark authored Sep 6, 2021
1 parent be004ce commit cd62e0b
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 17 deletions.
1 change: 1 addition & 0 deletions delfin/api/v1/storages.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def delete(self, req, id):
'.'
+ subclass.__name__)
self.task_rpcapi.remove_storage_in_cache(ctxt, storage['id'])
perf_job_controller.delete_perf_job(ctxt, storage['id'])

@wsgi.response(202)
def sync_all(self, req):
Expand Down
16 changes: 0 additions & 16 deletions delfin/leader_election/distributor/task_distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,6 @@ def __init__(self, ctx):
self.ctx = ctx
self.task_rpcapi = task_rpcapi.TaskAPI()

def __call__(self):
""" Schedule the collection tasks based on interval """

try:
# Remove jobs from scheduler when marked for delete
filters = {'deleted': True}
tasks = db.task_get_all(self.ctx, filters=filters)
LOG.debug("Total tasks found deleted "
"in this cycle:%s" % len(tasks))
for task in tasks:
self.task_rpcapi.remove_job(self.ctx, task['id'],
task['executor'])
except Exception as e:
LOG.error("Failed to remove periodic scheduling job , reason: %s.",
six.text_type(e))

def distribute_new_job(self, task_id):
partitioner = ConsistentHashing()
partitioner.start()
Expand Down
11 changes: 11 additions & 0 deletions delfin/task_manager/perf_job_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,14 @@ def create_perf_job(context, storage_id, capabilities):
filters = {'storage_id': storage_id}
task_id = db.task_get_all(context, filters=filters)[0].get('id')
metrics_rpcapi.TaskAPI().create_perf_job(context, task_id)


def delete_perf_job(context, storage_id):
# Delete it from scheduler
filters = {'storage_id': storage_id}
tasks = db.task_get_all(context, filters=filters)
for task in tasks:
metrics_rpcapi.TaskAPI().remove_job(context, task.get('id'),
task.get('executor'))
# Delete it from db
db.task_delete_by_storage(context, storage_id)
1 change: 0 additions & 1 deletion delfin/task_manager/scheduler/schedule_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
LOG = log.getLogger(__name__)

SCHEDULER_BOOT_JOBS = [
TaskDistributor.__module__ + '.' + TaskDistributor.__name__,
FailedTaskDistributor.__module__ + '.' + FailedTaskDistributor.__name__
]

Expand Down

0 comments on commit cd62e0b

Please sign in to comment.