Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make job scheduler local to task process #674

Merged

Conversation

NajmudheenCT
Copy link
Member

What this PR does / why we need it:
Enhance/Change Performance metrics collection framework to schedule jobs locally in all worker nodes.

Which issue this PR fixes (optional, in fixes #<issue number>(, fixes #<issue_number>, ...) format, will close that issue when PR gets merged): fixes #670

Special notes for your reviewer:

Release note:

@NajmudheenCT NajmudheenCT changed the base branch from master to perf_coll_fw_enhance August 24, 2021 03:45
LOG = log.getLogger(__name__)


class FailedTelemetryJob(object):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, TelemetryFailedJob is better than FailedTelemetryJob

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. made it TaskDistributor

@ThisIsClark
Copy link
Collaborator

ThisIsClark commented Aug 24, 2021

I think we should remove the word Telemetry from telemetry_failed_job_distributor.py and telemetry_job_distributor.py(both the file name and the class name), because we would notify the task creation/deletion/modification to job distributor, so they do not to 'telemetry' any more

@codecov
Copy link

codecov bot commented Aug 25, 2021

Codecov Report

Merging #674 (7227911) into perf_coll_fw_enhance (baa386e) will increase coverage by 0.02%.
The diff coverage is 71.95%.

@@                   Coverage Diff                    @@
##           perf_coll_fw_enhance     #674      +/-   ##
========================================================
+ Coverage                 70.15%   70.18%   +0.02%     
========================================================
  Files                       156      159       +3     
  Lines                     14801    14936     +135     
  Branches                   1822     1822              
========================================================
+ Hits                      10384    10483      +99     
- Misses                     3816     3846      +30     
- Partials                    601      607       +6     
Impacted Files Coverage Δ
delfin/cmd/task.py 0.00% <0.00%> (ø)
delfin/leader_election/factory.py 43.75% <0.00%> (-2.92%) ⬇️
delfin/task_manager/manager.py 0.00% <ø> (ø)
delfin/task_manager/metrics_manager.py 0.00% <0.00%> (ø)
delfin/task_manager/metrics_rpcapi.py 70.00% <70.00%> (ø)
...ager/scheduler/schedulers/telemetry/job_handler.py 76.87% <76.87%> (ø)
...er_election/distributor/failed_task_distributor.py 84.84% <84.84%> (ø)
...in/leader_election/distributor/task_distributor.py 90.00% <90.00%> (ø)
delfin/db/sqlalchemy/models.py 99.66% <100.00%> (+<0.01%) ⬆️
delfin/task_manager/scheduler/schedule_manager.py 56.52% <100.00%> (+0.96%) ⬆️
... and 12 more

'%s' % job['id'])
self.task_rpcapi.assign_failed_job(self.ctx, job)

LOG.debug('Assigned failed task for id: '
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think line 58 can be debug and 62 can be info

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, this code might change according distributor implementation, currently it is a pool based distributor

return call_context.cast(context, 'remove_job',
job=job)

def assign_failed_job(self, context, job):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have just 2 apis assign and remove with job names as they differ only by job names

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the handlers for both messages are different , if we make it same we need to pass on more argument to switch between types. since it is distribution over n/w we want to reduce message size, currently we use only task_id as parameter

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

'last_run_time': last_run_time}
db.task_update(self.ctx, self.task_id, update_task_dict)
LOG.info('Periodic collection tasks scheduled for for job id: '
'%s ' % self.task_id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.job_ids.add(job_id) need here right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done , thanks

@NajmudheenCT NajmudheenCT changed the title [WIP]Make job scheduler local to task process Make job scheduler local to task process Aug 26, 2021
@ThisIsClark
Copy link
Collaborator

leader_election/distributor need a init to make it as a module

@NajmudheenCT
Copy link
Member Author

leader_election/distributor need a init to make it as a module

its there !

@ThisIsClark
Copy link
Collaborator

leader_election/distributor need a init to make it as a module

its there !

I try to find that, but failed. Only test folder had __init__.py
image

@NajmudheenCT
Copy link
Member Author

leader_election/distributor need a init to make it as a module

its there !

I try to find that, but failed. Only test folder had __init__.py
image

You are right.. missed in this location.. added now


def __call__(self):
"""
:return:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the useless comments

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

def schedule_job(self, task_id):

if self.stopped:
"""If Job is stopped return immediately"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Single line comment please use #

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

# limitations under the License.
"""

**periodical task manager for metric collection tasks**
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment style should be same with other files, such as metrics_rpcapi.py

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


from delfin import manager
from delfin.task_manager.scheduler import schedule_manager

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No blank line here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Collaborator

@ThisIsClark ThisIsClark left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@sushanthakumar
Copy link
Collaborator

LGTM

@NajmudheenCT NajmudheenCT merged commit 903138a into sodafoundation:perf_coll_fw_enhance Aug 28, 2021
kumarashit added a commit that referenced this pull request Sep 14, 2021
* Make job scheduler local to task process (#674)

* Make job scheduler local to task process

* Notify distributor when a new task added (#678)

* Remove db-scan for new task creation (#680)

* Use consistent hash to manage the topic (#681)

* Remove the periodically call from task distributor (#686)

* Start one historic collection immediate when a job is rescheduled (#685)

* Start one historic collection immediate when a job is rescheduled

* Remove failed task distributor (#687)

* Improving Failed job handling and telemetry job removal (#689)

Co-authored-by: ThisIsClark <liuyuchibubao@gmail.com>
Co-authored-by: Ashit Kumar <akopensrc@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Make job scheduler local to task process
3 participants