Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better instrumentation for Worker.gather_dep #7217

Open
Tracked by #7345
fjetter opened this issue Oct 28, 2022 · 4 comments · Fixed by #7560 or #7586
Open
Tracked by #7345

Better instrumentation for Worker.gather_dep #7217

fjetter opened this issue Oct 28, 2022 · 4 comments · Fixed by #7560 or #7586
Labels
diagnostics enhancement Improve existing functionality or make things work better performance scheduling

Comments

@fjetter
Copy link
Member

fjetter commented Oct 28, 2022

Task queuing has been proven to significantly improve performance by reducing root task overproduction

In recent benchmarks and tests I noticed that one major source for root task overproduction is not necessarily that reducers are not assigned fast enough to the workers but that the workers are unable to run these tasks since they need to fetch dependencies first. If average root task runtime is much smaller than it takes to fetch dependencies, this can cause workers to run many data producers before it has the possibility to run a reducer.

Right now, we're almost blind to this situation but could be exposing much better metrics on the dashboard (or Prometheus).

Specifically, I'm interested in

  • How much time do tasks spend in the ready queue before they are worked on? Can we calculate averages on TaskGroups/Prefix? TaskGroups per Worker?
  • How much time do tasks spend in any state, e.g. fetch. In general, how long are wait times in our queues?
  • How long do gather_dep requests typically last, broken down per TaskGroup/Prefix?
  • How much time on the gather_dep request are we spending on
    • Connection establishment (e.g. connection pool empty, remote event loop is blocked, handshake takes a while)
    • Data (de-)serialization
    • Spill-to-disk

Ideally, I would love to get data for a Task X with dependencies deps that tells me

X spent 10s in ready queue
-> 8s spent fetching data
  -> 1s connection
  -> 2s spill-to-disk
  -> 2s (de-)serialization
  -> 2s network transfer
  -> 1s idle / event loop busy
-> 2s spent waiting for open slot on the ThreadPool

Some of this information is already available, other information we still need to collect. I don't think we have anything that can break it up this way and/or group by TaskGroups or individual tasks.

I think this kind of visibility would help us significantly with making decisions about optimizations, e.g. should we prioritize STA? Should we focus on getting a sendfile implementation up and running? Do connection attempts take way too long because event loops are blocked?

@crusaderky
Copy link
Collaborator

crusaderky commented Feb 10, 2023

There are two fundamental task journeys:

execute

  • waiting
    • (read below for breakdown)
  • ready / constrained
  • executing / long-running
    • _maybe_deserialize_task
    • _prepare_args_for_execution (unspill)
      • disk read
      • deserialization
    • run task in thread (per-thread timer)
    • run task in thread (wall clock - thread timer) e.g. GIL contention or more threads than cores or significant CPU use in main thread
    • run task in thread (seceded)
    • spill upon insertion in the SpillBuffer
      • serialization (typically, of other tasks)
      • disk write (typically, of other tasks)

gather

This is either triggered by a dependent in waiting state or by the Active Memory Manager.

  • no-worker
  • fetch
    • throttling incoming transfers (local saturated)
    • all peers are busy or in flight (remote saturated)
  • flight
    • failed network transfers (peer doesn't respond, or responds that it's busy or that it doesn't have the key)
    • successful network transfer
    • deserialization
    • spill upon insertion in the SpillBuffer
      • serialization (typically, of other tasks)
      • disk write (typically, of other tasks)

A few observations:

[1]

When gathering, both the successful network transfer and the deserialization times need to be apportioned to the keys bundled together by _select_keys_for_gather. The simplest, albeit very crude, way to do it is to just divide these times by the number of keys. I am disinclined to weight this division by the task sizes, because it would make it much more complicated (you'd need to use serialized and compressed sizes for transfers, total uncompressed size for decompression, and pickled size without buffers for unpickling time).

[2]

When gathering, it's probably lot more interesting to know why you're gathering (read: group by prefix of the dependent) than what you're gathering (group by prefix of the task itself)

[3]

It's not straightforward to break down the waiting time when more than one dependency needs fetching.
I'm oriented towards

  1. acquire all the timings of all dependencies, broken down by their 'gather' journey above
  2. divide the timings of each dependency by the number of dependents
  3. sum the timings from step 2 for all dependencies of the task that just transitioned from waiting to ready
  4. rescale the timings from step 3 by the time in waiting
upon transition fetch->flight:
    ts.<dependents that were in waiting state when dep went into flight> = {
        dep for dep in ts.dependents if dep.state == "waiting"
    }

upon transition waiting->ready:
    timings = Counter()
    for dep in ts.dependencies:
        if dep was not fetched:
            continue
        if ts not in dep.<dependents that were in waiting state when dep went into flight>:
            continue
        n_dependents = len(dep.<dependents that were in waiting state when dep went into flight>)
        for activity, duration in dep.timings:
            timings[activity] += duration / n_dependents
    rescale_factor = ts.<time spent in waiting> / sum(timings.values()) 
    for activity, duration in timings.items():
       timings[activity] *= rescale_factor

Again this is imperfect, but should handle reasonably well two big use cases:
A) fetch two large dependencies from the same worker. They are transferred serially, over two gather_dep calls.
time in waiting: 10s
gather of first dependency: 5s
gather of second dependency: 5s

B) fetch two dependencies from different workers. They are transferred at the same time.
time in waiting: 5s
gather of first dependency: 5s
gather of second dependency: 5s

[4]

The above algorithm is fairly complex and requires collecting a wealth of new information in the worker state machine.

[5]

Those are... a lot of states to break down by task prefix. It would generate several pages worth of plain text in the prometheus output, and I'm very concerned it would be far from trivial in terms of volume.

Not breaking everything down by task prefix is probably wiser.

I would pragmatically suggest:

  1. time spent in each TaskStateState, broken down by task prefix (Meter how long each task prefix stays in each state #7560)
  2. breakdown by activity of the executing state, NOT broken down by task prefix (out of scope: Top-level cluster efficiency metric #7565)
  3. breakdown by activity of the flight state, NOT broken down by task prefix.

The above would produce less overwhelming data, and would make both the apportioning of deserialization times and waiting times unnecessary.

@crusaderky
Copy link
Collaborator

The breakdown of execute described above is out of scope for this ticket and is discussed in #7565.

@fjetter
Copy link
Member Author

fjetter commented Jun 13, 2023

I suggest to "park" this ticket for now, wrap up what we're currently doing around spans and metric and reassess what we want on top of that. I actually think that most of what I was looking for with this ticket will be implemented by then

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
diagnostics enhancement Improve existing functionality or make things work better performance scheduling
Projects
None yet
2 participants