Skip to content

Commit

Permalink
Re-enable profiling of scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
jakirkham committed Nov 24, 2020
1 parent 177c458 commit c175d22
Showing 1 changed file with 43 additions and 0 deletions.
43 changes: 43 additions & 0 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import atexit
from collections import defaultdict, deque
from collections.abc import Mapping, Set
from contextlib import suppress
Expand Down Expand Up @@ -93,6 +94,20 @@
else:
import pickle

try:
import line_profiler

profiler = line_profiler.LineProfiler()

def dump_stats(p):
s = p.get_stats()
if any(s.timings.values()):
profiler.dump_stats(f"prof_{os.getpid()}.lstat")

atexit.register(dump_stats, profiler)
except ImportError:
def profile(func):
return func

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -2577,6 +2592,7 @@ def validate_state(self, allow_overlap=False):
# Manage Messages #
###################

@profiler
def report(self, msg, ts=None, client=None):
"""
Publish updates to all listening Queues and Comms
Expand Down Expand Up @@ -2690,6 +2706,7 @@ def remove_client_from_events():
)
self.loop.call_later(cleanup_delay, remove_client_from_events)

@profiler
def send_task_to_worker(self, worker, key):
""" Send a single computational task to a worker """
try:
Expand Down Expand Up @@ -2859,6 +2876,7 @@ def remove_plugin(self, plugin):
""" Remove external plugin from scheduler """
self.plugins.remove(plugin)

@profiler
def worker_send(self, worker, msg):
"""Send message to worker
Expand Down Expand Up @@ -3672,6 +3690,7 @@ def update_data(
if client:
self.client_desires_keys(keys=list(who_has), client=client)

@profiler
def report_on_key(self, key=None, ts=None, client=None):
assert (key is None) + (ts is None) == 1, (key, ts)
if ts is None:
Expand Down Expand Up @@ -3947,6 +3966,7 @@ async def register_worker_plugin(self, comm, plugin, name=None):
# State Transitions #
#####################

@profiler
def _remove_from_processing(self, ts, send_worker_msg=None):
"""
Remove *ts* from the set of processing tasks.
Expand All @@ -3967,6 +3987,7 @@ def _remove_from_processing(self, ts, send_worker_msg=None):
if send_worker_msg:
self.worker_send(w, send_worker_msg)

@profiler
def _add_to_memory(
self, ts, ws, recommendations, type=None, typename=None, **kwargs
):
Expand Down Expand Up @@ -4012,6 +4033,7 @@ def _add_to_memory(
if ts in cs.wants_what:
self.client_releases_keys(client="fire-and-forget", keys=[ts.key])

@profiler
def transition_released_waiting(self, key):
try:
ts = self.tasks[key]
Expand Down Expand Up @@ -4063,6 +4085,7 @@ def transition_released_waiting(self, key):
pdb.set_trace()
raise

@profiler
def transition_no_worker_waiting(self, key):
try:
ts = self.tasks[key]
Expand Down Expand Up @@ -4107,6 +4130,7 @@ def transition_no_worker_waiting(self, key):
pdb.set_trace()
raise

@profiler
def decide_worker(self, ts):
"""
Decide on a worker for task *ts*. Return a WorkerState.
Expand Down Expand Up @@ -4147,6 +4171,7 @@ def decide_worker(self, ts):

return worker

@profiler
def transition_waiting_processing(self, key):
try:
ts = self.tasks[key]
Expand Down Expand Up @@ -4193,6 +4218,7 @@ def transition_waiting_processing(self, key):
pdb.set_trace()
raise

@profiler
def transition_waiting_memory(self, key, nbytes=None, worker=None, **kwargs):
try:
ws = self.workers[worker]
Expand Down Expand Up @@ -4228,6 +4254,7 @@ def transition_waiting_memory(self, key, nbytes=None, worker=None, **kwargs):
pdb.set_trace()
raise

@profiler
def transition_processing_memory(
self,
key,
Expand Down Expand Up @@ -4335,6 +4362,7 @@ def transition_processing_memory(
pdb.set_trace()
raise

@profiler
def transition_memory_released(self, key, safe=False):
try:
ts = self.tasks[key]
Expand Down Expand Up @@ -4394,6 +4422,7 @@ def transition_memory_released(self, key, safe=False):
pdb.set_trace()
raise

@profiler
def transition_released_erred(self, key):
try:
ts = self.tasks[key]
Expand Down Expand Up @@ -4435,6 +4464,7 @@ def transition_released_erred(self, key):
pdb.set_trace()
raise

@profiler
def transition_erred_released(self, key):
try:
ts = self.tasks[key]
Expand Down Expand Up @@ -4469,6 +4499,7 @@ def transition_erred_released(self, key):
pdb.set_trace()
raise

@profiler
def transition_waiting_released(self, key):
try:
ts = self.tasks[key]
Expand Down Expand Up @@ -4505,6 +4536,7 @@ def transition_waiting_released(self, key):
pdb.set_trace()
raise

@profiler
def transition_processing_released(self, key):
try:
ts = self.tasks[key]
Expand Down Expand Up @@ -4549,6 +4581,7 @@ def transition_processing_released(self, key):
pdb.set_trace()
raise

@profiler
def transition_processing_erred(
self, key, cause=None, exception=None, traceback=None, **kwargs
):
Expand Down Expand Up @@ -4618,6 +4651,7 @@ def transition_processing_erred(
pdb.set_trace()
raise

@profiler
def transition_no_worker_released(self, key):
try:
ts = self.tasks[key]
Expand All @@ -4644,6 +4678,7 @@ def transition_no_worker_released(self, key):
pdb.set_trace()
raise

@profiler
def remove_key(self, key):
ts = self.tasks.pop(key)
assert ts.state == "forgotten"
Expand All @@ -4657,6 +4692,7 @@ def remove_key(self, key):
if key in self.task_metadata:
del self.task_metadata[key]

@profiler
def _propagate_forgotten(self, ts, recommendations):
ts.state = "forgotten"
key = ts.key
Expand Down Expand Up @@ -4694,6 +4730,7 @@ def _propagate_forgotten(self, ts, recommendations):
)
ts.who_has.clear()

@profiler
def transition_memory_forgotten(self, key):
try:
ts = self.tasks[key]
Expand Down Expand Up @@ -4734,6 +4771,7 @@ def transition_memory_forgotten(self, key):
pdb.set_trace()
raise

@profiler
def transition_released_forgotten(self, key):
try:
ts = self.tasks[key]
Expand Down Expand Up @@ -4770,6 +4808,7 @@ def transition_released_forgotten(self, key):
pdb.set_trace()
raise

@profiler
def transition(self, key, finish, *args, **kwargs):
"""Transition a key from its current state to the finish state
Expand Down Expand Up @@ -4862,6 +4901,7 @@ def transition(self, key, finish, *args, **kwargs):
pdb.set_trace()
raise

@profiler
def transitions(self, recommendations):
"""Process transitions until none are left
Expand Down Expand Up @@ -4913,6 +4953,7 @@ def reschedule(self, key=None, worker=None):
# Assigning Tasks to Workers #
##############################

@profiler
def check_idle_saturated(self, ws, occ=None):
"""Update the status of the idle and saturated state
Expand Down Expand Up @@ -5503,6 +5544,7 @@ def adaptive_target(self, comm=None, target_duration=None):
return len(self.workers) - len(to_close)


@profiler
def decide_worker(ts, all_workers, valid_workers, objective):
"""
Decide which worker should take task *ts*.
Expand Down Expand Up @@ -5749,6 +5791,7 @@ def __init__(self, scheduler, name):
def update_graph(self, scheduler, dsk=None, keys=None, restrictions=None, **kwargs):
self.keys.update(keys)

@profiler
def transition(self, key, start, finish, *args, **kwargs):
if finish == "memory" or finish == "erred":
ts = self.scheduler.tasks.get(key)
Expand Down

0 comments on commit c175d22

Please sign in to comment.