Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dynamic sub-processes for metrics collection #708

Merged
merged 2 commits into from
Sep 28, 2021

Conversation

joseph-v
Copy link
Collaborator

@joseph-v joseph-v commented Sep 22, 2021

What this PR does / why we need it:
Support dynamic sub-processes for metrics collection

Which issue this PR fixes (optional, in fixes #<issue number>(, fixes #<issue_number>, ...) format, will close that issue when PR gets merged): fixes #

Special notes for your reviewer:

Release note:

Dynamic subprocess is an optimization to better use of Node resources by spawning multiple python processess in the same node, for the collection.

Following are the configuration options are available for this feature and their corresponding default values. These values may be changed in the 'TELEMETRY' section of delfin.conf file

enable_dynamic_subprocess = False
process_cleanup_interval = 60
task_cleanup_delay = 10
group_change_detect_interval = 30
max_storages_in_child = 5
max_childs_in_node = 100000

This feature can be enabled with below lines in delfin.conf file,

[TELEMETRY]
enable_dynamic_subprocess = True

If the storages handled in a subprocess increases to more than configured value (max_storages_in_child) a new subprocess is spawn by the metrics manager to handle storages

task_cleanup_delay is the minimum delay in seconds before stopping the subprocess by metrics manager, so that it can handle the remove job/remove failed job

process_cleanup_interval is the interval in seconds a clean up function executes to remove unused subprocesses

group_change_detect_interval is interval in seconds, watcher for the process join and process leave callbacks are checked

max_childs_in_node number process that can be created in node before raising exception. Large buffer is needed as process removal may be delayed.

Test cases for this feature is available in link: https://docs.google.com/spreadsheets/d/1uy7B4nVSI_T9qM_Sc66A7nK_lr6RIEkC/edit#gid=1006830486

Tested the feature in single node environment
Test report
https://docs.google.com/spreadsheets/d/1X9igJZnjzx9viI6wmFpqnJN1njO6JGJe/edit#gid=525327673

@codecov
Copy link

codecov bot commented Sep 22, 2021

Codecov Report

Merging #708 (dc82566) into master (26426d8) will decrease coverage by 0.29%.
The diff coverage is 45.33%.

@@            Coverage Diff             @@
##           master     #708      +/-   ##
==========================================
- Coverage   70.81%   70.51%   -0.30%     
==========================================
  Files         161      163       +2     
  Lines       15600    15871     +271     
  Branches     1934     1972      +38     
==========================================
+ Hits        11047    11192     +145     
- Misses       3908     4020     +112     
- Partials      645      659      +14     
Impacted Files Coverage Δ
delfin/common/config.py 95.00% <ø> (ø)
delfin/task_manager/metrics_rpcapi.py 50.00% <ø> (ø)
delfin/task_manager/scheduler/schedule_manager.py 66.66% <0.00%> (ø)
delfin/task_manager/subprocess_manager.py 0.00% <0.00%> (ø)
delfin/coordination.py 60.22% <43.58%> (-4.37%) ⬇️
delfin/service.py 26.82% <50.00%> (+0.77%) ⬆️
delfin/task_manager/metrics_manager.py 55.34% <51.75%> (+55.34%) ⬆️
delfin/task_manager/subprocess_rpcapi.py 53.33% <53.33%> (ø)
...ager/scheduler/schedulers/telemetry/job_handler.py 71.32% <100.00%> (+0.95%) ⬆️
delfin/drivers/fake_storage/__init__.py 94.79% <0.00%> (-0.28%) ⬇️
... and 4 more

@joseph-v joseph-v force-pushed the optional-dynamic branch 5 times, most recently from 70ab846 to 45cecb5 Compare September 23, 2021 05:43
LOG.info("GROUP {0} already exist".format(group))

def delete_group(self, group):
# Create the group
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong comment for the function!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, removed.


def leave_group(self, group):
try:
# Join the group
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong comment!


def get_members(self, group):
try:
# Join the group
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here!

@joseph-v joseph-v force-pushed the optional-dynamic branch 6 times, most recently from d994e10 to 81eafe9 Compare September 23, 2021 10:28
NajmudheenCT
NajmudheenCT previously approved these changes Sep 27, 2021
Copy link
Member

@NajmudheenCT NajmudheenCT left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@sushanthakumar
Copy link
Collaborator

LGTM

NajmudheenCT
NajmudheenCT previously approved these changes Sep 27, 2021
Copy link
Member

@NajmudheenCT NajmudheenCT left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

kumarashit
kumarashit previously approved these changes Sep 27, 2021
Copy link
Collaborator

@kumarashit kumarashit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

cfg.IntOpt('max_storages_in_child',
default=5,
help='Max storages handled by one local executor process'),
cfg.IntOpt('max_childs_in_node',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this default value ok? It means allowing 100000 process?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we do not restrict number of processes created. Used a large number as default, before raising exception. Also, delete of process, when having no storages to handle takes about 90 seconds. So large number will provide a buffer, if we create and delete storage frequently

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ThisIsClark This can be customised based on user enthronement based on their deployment configuration. For example this can be set to number of cores available in a node.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if storages is greater than limitations, what will happen

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not restricting storages, we allow storages to get registered.


def create_group(self, group):
try:
self.coordinator.create_group(group.encode()).get()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why call get after called create_group

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create_group() is async call, following get() ensure that create group is completed

try:
self.coordinator.delete_group(group.encode()).get()
except coordination.GroupNotCreated:
LOG.info("GROUP {0} Group not created".format(group))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First letter of first word should be in upper case, others should be in lower case.
Same as other log

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -14,18 +14,30 @@
"""
periodical task manager for metric collection tasks**
"""
import datetime
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please optimize the import order

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

self.rpcapi.assign_failed_job_local(
context, f_task['id'], executor_topic)

def process_cleanup(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except the periodically cleanup, do we have another mechanism to cleanup it forwardly?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a storage is deleted and the process do not handle any other storages, we want stop the processes after a delay (for the process to handle remove storage). We send the remove storage message, wait for the message to be handled and cleanup later.

Copy link
Collaborator

@wisererik wisererik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@NajmudheenCT NajmudheenCT left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@NajmudheenCT NajmudheenCT merged commit ca4b819 into sodafoundation:master Sep 28, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants