Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fuse pipelines with different numbers of tasks #284

Closed
Tracked by #339
TomNicholas opened this issue Jul 31, 2023 · 3 comments · Fixed by #368
Closed
Tracked by #339

Fuse pipelines with different numbers of tasks #284

TomNicholas opened this issue Jul 31, 2023 · 3 comments · Fixed by #368

Comments

@TomNicholas
Copy link
Member

TomNicholas commented Jul 31, 2023

Last week @tomwhite and @dcherian and I discussed possible future optimizations for Cubed - this is my attempt to elucidate what I was suggesting.

Motivation

The best-case scenario for a cubed computation is that all sequential operations get fused, because then no writing to intermediate stores is required. With no writing then every chunk moves through the whole calculation in parallel, despite multiple operations happening to it along the way. In general we can't guarantee zero intermediate stores being required because we also want to guarantee predictable memory usage during a full shuffle, but we might nevertheless aspire to fuse everything else 😁

Idea

Currently Cubed's optimization pass fuses some blockwise operations together, but it can only fuse blockwise operations that have the same number of tasks. If we could find a way to fuse blockwise operations with different numbers of tasks then potentially anything up to a full shuffle (see #282) could be fused.

Use cases

It's possible to construct cubed plans in which blockwise operations with different numbers of tasks occur sequentially.

from cubed.core.plan import visit_nodes

def print_num_tasks_per_pipeline(plan: cubed.core.Plan, optimize_graph: bool = False):
    """Print the number of tasks needed to execute each pipeline in this plan."""
    dag = plan.optimize().dag if optimize_graph else plan.dag.copy()
    for _, node in visit_nodes(dag, resume=None):
        print(f"{node['name']}: op_name = {node['op_name']}, num_tasks = {node['pipeline'].num_tasks}")

This can happen with concat:

a = xp.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]], chunks=(2, 2))
b = xp.asarray([[1, 1, 1], [1, 1, 1], [1, 1, 1]], chunks=(2, 2))
c = xp.concat([a + 1, b])
c.visualize()

image

(I don't really understand what all the side inputs are in these graphs - I hope they don't invalidate what I'm suggesting!)

print_num_tasks_per_pipeline(c.plan)
array-010: op_name = blockwise, num_tasks = 4
array-013: op_name = blockwise, num_tasks = 6

Or matmul:

import cubed.random

spec = cubed.Spec(allowed_mem=2_000_000_000)

a = cubed.random.random(
    (50000, 50000), chunks=(5000, 5000), spec=spec
)  # 200MB chunks
b = cubed.random.random(
    (50000, 50000), chunks=(5000, 5000), spec=spec
)  # 200MB chunks
c = xp.astype(a, xp.float32)
d = xp.astype(b, xp.float32)
e = xp.matmul(c, d)
e.visualize(optimize_graph=True)

image

print_num_tasks_per_pipeline(e.plan, optimize_graph=True)
array-139: op_name = blockwise, num_tasks = 100
array-140: op_name = blockwise, num_tasks = 100
array-141: op_name = blockwise, num_tasks = 1000
array-145: op_name = blockwise, num_tasks = 300
array-150: op_name = blockwise, num_tasks = 100

Implementation ideas

By definition 1 task == processing one Cubed chunk, but Cubed also currently assumes that 1 Zarr chunk == 1 Cubed chunk. This is generally what sets the number of tasks in a stage, and hence which pipelines can be fused. To fuse other pipelines we have to generalize this relationship. We can't open multiple Cubed chunks per Zarr chunk because reading/writing to different parts of the same Zarr chunk would sacrifice idempotence.

However we could imagine opening multiple Zarr chunks for one Cubed chunk. (As long as the total size of the Zarr chunks opened for 1 Cubed chunk is < allowed_mem.) This would make the number of tasks for a pipeline choosable (within some range), and we could choose how many Zarr chunks to open such that the number of tasks now matches between two consecutive pipelines.

Another way to maybe think about this is that if during your computation you have smaller chunks than your allowed_mem budget was set for, then as you still only load one chunk per container, you are potentially "wasting" all that extra RAM overhead you requested. Opening more chunks per container allows for using that extra RAM in some cases, and if you can fit all the extra chunks you need to get from one pipeline to another you could now just fuse those two pipelines.

Questions

  1. Does this make any sense?
  2. Is there actually a realistic use case for this?
  3. Is this different from what's suggested in Fuse connected blockwise subgraphs #136? I tried recreating that example and noticed that most of the arrays in that subgraph have the same number of tasks, but wasn't sure if that was a coincidence.
  4. What would the actual fusion look like now? Calling each operation on a larger array (1 Cubed chunk, corresponding to multiple Zarr chunks), then doing blockwise fusion as normal?
  5. How does this scale? Not much use if can only fuse by having allowed_mem >> chunksize. But if "batches" can be submitted then might work?
@tomwhite
Copy link
Member

tomwhite commented Aug 1, 2023

I think this makes sense - at a high-level it's like what rechunker does with write_chunk, which can be a multiple of the output array's target_chunks (see diagram at https://rechunker.readthedocs.io/en/latest/algorithm.html).

I think there are a couple of tricky parts:

  1. Making sure that memory bounds are adhered to. It's not just a question of making sure that Zarr chunks are smaller than allowed_mem, but the Zarr compressed chunks, and the nature of the operation matter too. There some explanation here, but it might be worth generalising this, perhaps by modelling the different parts of memory usage in a class. Then combining operations would use this class to see how memory usage would change according to the model.
  2. Finding the right number of input chunks to read from. For example, in principle you could write a reduction to use a single task to read 1000 input chunks, but that would probably be slower than using three rounds where each task read 10 chunks. So there's probably some optimal "fan-in".

I wonder if the concat case is sufficiently different from reduction (matmul) since it does weird things across block boundaries, to make it worth focusing on them separately?

A couple of other things I noticed:

Currently Cubed's optimization pass fuses some blockwise operations together, but it can only fuse blockwise operations that have the same number of tasks.

Strictly speaking we should check that numblocks is the same since that it what blockwise cares about, rather than number of tasks, but it's not clear to me what, if anything, this currently breaks in practice.

(I don't really understand what all the side inputs are in these graphs - I hope they don't invalidate what I'm suggesting!)

They are used as inputs, but do not have any chunks written to storage. In the case of the empty array it is just the Zarr metadata that is written to storage, and for the offsets array it is entirely virtual (i.e. in-memory). I don't think they invalidate anything you are suggesting. In fact, I think they could both be virtual (#247), and possibly even removed from the DAG (maybe by changing the blockwise function to incorporate them using partial?).

@TomNicholas
Copy link
Member Author

at a high-level it's like what rechunker does with write_chunk

Oh yeah of course it is! In that case they were trying to minimise number of tasks, whereas in cubed we would be either trying to minimise number of tasks or to set the number of tasks to whatever value allows for more fusion.

I wonder if the concat case is sufficiently different from reduction (matmul) since it does weird things across block boundaries, to make it worth focusing on them separately?

Maybe? I think that it would be instructive to focus on the reduction case initially, because it would directly affect performance in many real workloads, and expose this "fan-in" question immediately. As an example:

spec = cubed.Spec(allowed_mem='80kB')

a = cubed.random.random((10000, 100), chunks=(10, 100), spec=spec)
a

Screenshot from 2023-07-31 18-49-57

avg = xp.max(a)
avg.visualize(optimize_graph=True)

image

print_num_tasks_per_pipeline(avg.plan, optimize_graph=True)
array-3470: op_name = blockwise, num_tasks = 1000
array-3474: op_name = blockwise, num_tasks = 100
array-3478: op_name = blockwise, num_tasks = 10
array-3483: op_name = blockwise, num_tasks = 1

Finding the right number of input chunks to read from

So you're saying there is a trade-off between opening many chunks in one task (which won't be parallel) and reading one chunk per task (which is embarrassingly parallel but requires waiting for disk IO). Does that mean the optimal "fan-in" is basically just "the number of chunks one task would have to attempt to read before reading them took longer than it takes to do an extra round of read/write IO to disk for one chunk"?

I think they could both be virtual (#247), and possibly even removed from the DAG

That would be nice. It would make the DAG more intuitive I think.

@tomwhite
Copy link
Member

tomwhite commented Aug 2, 2023

Does that mean the optimal "fan-in" is basically just "the number of chunks one task would have to attempt to read before reading them took longer than it takes to do an extra round of read/write IO to disk for one chunk"?

Yes, I think so. At the moment the number of rounds is determined solely by the memory available, but what we're saying here is that there should also be some consideration of number of chunks read (fan-in).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants