Skip to content

Commit

Permalink
Support dynamic sub-processes for metrics collection (#708)
Browse files Browse the repository at this point in the history
* Support sub-processes for metrics collection
  • Loading branch information
joseph-v authored Sep 28, 2021
1 parent 26426d8 commit ca4b819
Show file tree
Hide file tree
Showing 11 changed files with 676 additions and 70 deletions.
18 changes: 18 additions & 0 deletions delfin/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,24 @@
.DEF_PERFORMANCE_HISTORY_ON_RESCHEDULE,
help='default history(in sec) to be collected on a job '
'reschedule'),
cfg.BoolOpt('enable_dynamic_subprocess',
default=False,
help='Enable dynamic subprocess metrics collection'),
cfg.IntOpt('process_cleanup_interval',
default=60,
help='Background process cleanup call interval in sec'),
cfg.IntOpt('task_cleanup_delay',
default=10,
help='Delay for task cleanup before killing child in sec'),
cfg.IntOpt('group_change_detect_interval',
default=30,
help='Local executor group change detect interval in sec'),
cfg.IntOpt('max_storages_in_child',
default=5,
help='Max storages handled by one local executor process'),
cfg.IntOpt('max_childs_in_node',
default=100000,
help='Max processes that can be spawned before forcing fail'),
]

CONF.register_opts(telemetry_opts, "telemetry")
Expand Down
54 changes: 52 additions & 2 deletions delfin/coordination.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import inspect

import decorator
import tooz

from oslo_config import cfg
from oslo_log import log
from oslo_utils import uuidutils
Expand Down Expand Up @@ -334,7 +334,7 @@ def __init__(self):
def join_group(self):
try:
self.coordinator.join_partitioned_group(self.GROUP_NAME)
except tooz.coordination.MemberAlreadyExist:
except coordination.MemberAlreadyExist:
LOG.info('Member %s already in partitioner_group' % CONF.host)

def get_task_executor(self, task_id):
Expand All @@ -350,3 +350,53 @@ def register_watcher_func(self, on_node_join, on_node_leave):

def watch_group_change(self):
self.coordinator.run_watchers()


class GroupMembership(Coordinator):

def __init__(self, agent_id):
super(GroupMembership, self). \
__init__(agent_id=agent_id, prefix="")

def create_group(self, group):
try:
self.coordinator.create_group(group.encode()).get()
except coordination.GroupAlreadyExist:
LOG.info("Group {0} already exist".format(group))

def delete_group(self, group):
try:
self.coordinator.delete_group(group.encode()).get()
except coordination.GroupNotCreated:
LOG.info("Group {0} not created".format(group))
except coordination.GroupNotEmpty:
LOG.info("Group {0} not empty".format(group))
except coordination.ToozError:
LOG.info("Group {0} internal error while delete".format(group))

def join_group(self, group):
try:
self.coordinator.join_group(group.encode()).get()
except coordination.MemberAlreadyExist:
LOG.info('Member %s already in group' % group)

def leave_group(self, group):
try:
self.coordinator.leave_group(group.encode()).get()
except coordination.GroupNotCreated:
LOG.info('Group %s not created' % group)

def get_members(self, group):
try:
return self.coordinator.get_members(group.encode()).get()
except coordination.GroupNotCreated:
LOG.info('Group %s not created' % group)

return None

def register_watcher_func(self, group, on_process_join, on_process_leave):
self.coordinator.watch_join_group(group.encode(), on_process_join)
self.coordinator.watch_leave_group(group.encode(), on_process_leave)

def watch_group_change(self):
self.coordinator.run_watchers()
23 changes: 23 additions & 0 deletions delfin/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,29 @@ def start(self):
super(TaskService, self).start()


class MetricsService(Service):
"""Service object for triggering metrics manager functionalities.
"""

@classmethod
def create(cls, host=None, binary=None, topic=None,
manager=None, periodic_interval=None,
periodic_fuzzy_delay=None, service_name=None,
coordination=False, *args, **kwargs):
service_obj = super(MetricsService, cls).create(
host=host, binary=binary, topic=topic, manager=manager,
periodic_interval=periodic_interval,
periodic_fuzzy_delay=periodic_fuzzy_delay,
service_name=service_name,
coordination=coordination, *args, **kwargs)

return service_obj

def start(self):
super(MetricsService, self).start()
self.manager.init_scheduler(self.topic, self.host)


class LeaderElectionService(service.Service):
"""Leader election service for distributed system
Expand Down
Loading

0 comments on commit ca4b819

Please sign in to comment.