Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dump cluster state for debugging #5068

Closed
fjetter opened this issue Jul 15, 2021 · 1 comment
Closed

Dump cluster state for debugging #5068

fjetter opened this issue Jul 15, 2021 · 1 comment

Comments

@fjetter
Copy link
Member

fjetter commented Jul 15, 2021

In the past months, we've seen occasional deadlock issues connected to the scheduler/worker state machine. To debug this, I wrote a small script which essentially fetches and serializes the entire state of a stuck cluster (see below). Think Scheduler.story + Scheduler.get_logs + much more for everything.
The script is very crude and tries to get the information without trying to properly serialize all objects. The emphasize was rather to have a is fully json serializable representation of the cluster to allow for easier portability, formatting, etc.

I wanted to preserve this for prosperity in case this helps anyone.

Further, I am wondering if we wanted to have such a functionality as a first class citizen. Think of Client.collect_cluster_state (Note: this may create several GBs worth of data). The client function would then try to do a better job. Sometimes it would be sufficient to extend or modify existing identity method but much more thoroughly, of course. If I were to implement this properly, I would likely start with attaching a to_dict method to all our classes which yields a json/yaml serializable representation (including task state, worker state, etc.)

Thoughts? Would people consider this helpful?

Disclaimer

the script strips code, traceback, and exceptions from all output such that there should be no IP leak but I don't guarantee anything if there are sensitive logs, task keys, IP addresses, etc.

Last script update: 2021-09-29

from collections import deque


def _normalize(o, simple=False):
    from distributed.scheduler import TaskState as TSScheduler
    from distributed.scheduler import WorkerState
    from distributed.worker import TaskState as TSWorker

    # Blacklist runspec since this includes serialized functions
    # and arguments which might include sensitive data
    blacklist_attributes = [
        "runspec",
        "_run_spec",
        "exception",
        "traceback",
        "_exception",
        "_traceback",
    ]

    if isinstance(o, dict):
        try:
            res = {}
            for k, v in o.items():
                k = _normalize(k, simple=simple)
                try:
                    hash(k)
                except TypeError:
                    k = str(k)
                v = _normalize(v, simple=simple)
                res[k] = v
            return res

        except TypeError:
            print({k: type(k) for k in o})
            raise
    elif isinstance(o, (set, deque, tuple, list)):
        return [_normalize(el, simple=simple) for el in o]
    elif isinstance(o, WorkerState):
        res = o.identity()
        res["memory"] = {
            "managed": o.memory.managed,
            "managed_in_memory": o.memory.managed_in_memory,
            "managed_spilled": o.memory.managed_spilled,
            "unmanaged": o.memory.unmanaged,
            "unmanaged_recent": o.memory.unmanaged_recent,
        }
        return res
    elif isinstance(o, TSScheduler):
        if simple:
            # Due to cylcic references in the dependent/dependency graph
            # mapping this causes an infinite recursion
            return str(o)
        base = {
            "type": str(type(o)),
            "repr": str(o),
        }
        base.update(
            {
                s: _normalize(getattr(o, s), simple=True)
                for s in TSScheduler.__slots__
                if s not in blacklist_attributes
            }
        )
        return base
    elif isinstance(o, (TSWorker, TSScheduler)):
        if simple:
            # Due to cylcic references in the dependent/dependency graph
            # mapping this causes an infinite recursion
            return str(o)
        return _normalize(
            {k: v for k, v in o.__dict__.items() if k not in blacklist_attributes},
            simple=True,
        )
    else:
        return str(o)


def get_worker_info(dask_worker):
    import dask

    return _normalize(
        {
            "status": dask_worker.status,
            "ready": dask_worker.ready,
            "constrained": dask_worker.constrained,
            "long_running": dask_worker.long_running,
            "executing_count": dask_worker.executing_count,
            "in_flight_tasks": dask_worker.in_flight_tasks,
            "in_flight_workers": dask_worker.in_flight_workers,
            "paused": dask_worker.paused if hasattr(dask_worker, "paused") else None,
            "log": dask_worker.log,
            "tasks": dask_worker.tasks,
            "memory_limit": dask_worker.memory_limit,
            "memory_target_fraction": dask_worker.memory_target_fraction,
            "memory_spill_fraction": dask_worker.memory_spill_fraction,
            "memory_pause_fraction": dask_worker.memory_pause_fraction,
            "logs": dask_worker.get_logs(),
            "config": dict(dask.config.config),
            "incoming_transfer_log": list(dask_worker.incoming_transfer_log),
            "outgoing_transfer_log": list(dask_worker.outgoing_transfer_log),
        }
    )


def get_scheduler_info(dask_scheduler):
    import dask

    state = {
        "transition_log": dask_scheduler.transition_log,
        "log": dask_scheduler.log,
        "tasks": dask_scheduler.tasks,
        "workers": dask_scheduler.workers,
        "logs": dask_scheduler.get_logs(),
        "config": dict(dask.config.config),
        "events": dask_scheduler.events,
    }
    if "stealing" in dask_scheduler.extensions:
        ext = dask_scheduler.extensions["stealing"]
        attrs = [
            "stealable_all",
            "stealable",
            "key_stealable",
            "in_flight",
        ]
        stealing = {at: getattr(ext, at) for at in attrs}

        in_flight_occ = ext.in_flight_occupancy
        stealing["in_flight_occupancy"] = {
            ws.name: v for ws, v in in_flight_occ.items()
        }
        state["stealing"] = stealing
    return _normalize(state)


worker_info = client.run(get_worker_info)
scheduler_info = client.run_on_scheduler(get_scheduler_info)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants