Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deserialization of compressed data is sluggish and causes memory flares #7433

Open
crusaderky opened this issue Dec 22, 2022 · 17 comments · Fixed by #7437
Open

Deserialization of compressed data is sluggish and causes memory flares #7433

crusaderky opened this issue Dec 22, 2022 · 17 comments · Fixed by #7437

Comments

@crusaderky
Copy link
Collaborator

crusaderky commented Dec 22, 2022

If lz4 is installed and a chunk can be compressed efficiently, it is compressed automatically.
This happens both in network comms (configurable) and upon spilling (not configurable).

While working on coiled/benchmarks#629, I found out that deserializing a compressed frame is abysmally slow:

image

When deserialization is triggered by network comms, it is offloaded to a separate thread for any message larger than 10 MiB (distributed.comm.offload). Note that a message will contain multiple small keys if possible.
I did not test if the GIL is released while this happens.

When deserialization is triggered by unspilling, the whole event loop is temporarily blocked.

Microbenchmark

Serializing a 128 MiB compressible numpy array takes 9.96ms; there's a modest overhead on top of the raw call to lz4, which takes 9.75ms.
Deserializing the same takes a whopping 144ms. The raw lz4 decompress takes an already huge 83ms.

At the very least we should be able to trim down the overhead. Also the raw decompression speed is suspiciously slow - typically, compression takes much longer than decompression so I suspect that something must be going on upstream.

import io
import lz4.frame
import numpy
from distributed.protocol import deserialize_bytes, serialize_bytelist

a1 = numpy.random.random((4096, 4096))  # 128 MiB, uncompressible
a2 = numpy.ones((4096, 4096))  # 128 MiB, highly compressible

buf1 = io.BytesIO()
buf2 = io.BytesIO()
buf1.writelines(serialize_bytelist(a1))
buf2.writelines(serialize_bytelist(a2))
b1 = buf1.getvalue()
b2 = buf2.getvalue()
b3 = lz4.frame.compress(a2)

print("uncompressible size", len(b1))
print("compressible size", len(b2))
print("raw lz4 size", len(b3))
print()

print("serialize uncompressible")
%timeit serialize_bytelist(a1)
print("serialize compressible")
%timeit serialize_bytelist(a2)
print("raw lz4 compress")
%timeit lz4.frame.compress(a2)
print()

print("deserialize uncompressible")
%timeit deserialize_bytes(b1)
print("deserialize compressible")
%timeit deserialize_bytes(b2)
print("raw lz4 decompress")
%timeit lz4.frame.decompress(b3)

Output:

uncompressible size 134217969
compressible size 526635
raw lz4 size 566291

serialize uncompressible
89.9 µs ± 15.6 µs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
serialize compressible
9.96 ms ± 78.5 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
raw lz4 compress
9.75 ms ± 115 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

deserialize uncompressible
19.2 ms ± 138 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
deserialize compressible
143 ms ± 568 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
raw lz4 decompress
88.3 ms ± 113 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

serialize_bytelist and deserialize_bytes in the code above are exactly what distributed.spill uses. However, I measured the timings in distributed.comm.tcp and I got the same number (134ms).

@crusaderky
Copy link
Collaborator Author

Notably, a lot of the tests in coiled-runtime start from da.random.random(), which produces uncompressible data, and will not show this issue.

@shughes-uk
Copy link
Contributor

shughes-uk commented Dec 22, 2022

Replicated on

Python: 3.11.0
macOS: 13.1
arch : arm64
lz4 : 4.0.2
distributed : 2022.12.1
numpy : 1.24.0
msgpack : 1.0.4

uncompressible size 134217969
compressible size 526635
raw lz4 size 566291

serialize uncompressible
39.5 µs ± 3.99 µs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
serialize compressible
8.35 ms ± 48.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
raw lz4 compress
7.78 ms ± 49.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

deserialize uncompressible
11.3 ms ± 577 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
deserialize compressible
70.4 ms ± 659 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
raw lz4 decompress
40.6 ms ± 479 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

@crusaderky
Copy link
Collaborator Author

crusaderky commented Dec 23, 2022

TL,DR; It's death by deep-copy. We're spending 85ms doing actual decompression and 49ms doing deep-copies.

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000    0.134    0.134 distributed/protocol/serialize.py:662(deserialize_bytes)
        1    0.000    0.000    0.098    0.098 distributed/protocol/compression.py:193(decompress)
        1    0.000    0.000    0.098    0.098 distributed/protocol/compression.py:195(<listcomp>)
        2    0.098    0.049    0.098    0.049 {built-in method lz4.block._block.decompress}
        1    0.000    0.000    0.036    0.036 distributed/protocol/serialize.py:473(merge_and_deserialize)
        1    0.036    0.036    0.036    0.036 {method 'join' of 'bytearray' objects}

The 128 MiB input buffer is split in two frames (distributed.block.shard: 64 MiB).
The two calls to lz4.block._block.decompress:

  1. write the output to a local raw buffer dest:
    https://github.com/python-lz4/python-lz4/blob/79370987909663d4e6ef743762768ebf970a2383/lz4/block/_block.c#L349-L359
  2. then, they deep-copy to the output bytes object:
    https://github.com/python-lz4/python-lz4/blob/79370987909663d4e6ef743762768ebf970a2383/lz4/block/_block.c#L389
  3. The two bytes objects are not contiguous. Hence, they are deep-copied again into their final destination by merge_and_deserialize

Fixing this will require an upstream PR that lets us pass an out= parameter.

If I increase distributed.block.shard to 128 MiB, the profile changes - but not in a very good way:

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000    0.113    0.113 distributed/protocol/serialize.py:662(deserialize_bytes)
        1    0.000    0.000    0.094    0.094 distributed/protocol/compression.py:193(decompress)
        1    0.000    0.000    0.094    0.094 distributed/protocol/compression.py:195(<listcomp>)
        1    0.094    0.094    0.094    0.094 {built-in method lz4.block._block.decompress}
        1    0.000    0.000    0.019    0.019 distributed/protocol/serialize.py:473(merge_and_deserialize)
        1    0.000    0.000    0.019    0.019 distributed/protocol/serialize.py:371(deserialize)
        1    0.000    0.000    0.019    0.019 distributed/protocol/serialize.py:56(dask_loads)
        1    0.000    0.000    0.019    0.019 distributed/utils.py:746(wrapper)
        1    0.000    0.000    0.019    0.019 distributed/protocol/numpy.py:114(deserialize_numpy_ndarray)
        1    0.000    0.000    0.019    0.019 numpy/core/_asarray.py:22(require)
        1    0.019    0.019    0.019    0.019 {method 'copy' of 'numpy.ndarray' objects}

We can see that numpy is performing a deep copy - which should not happen. So even here we should be able to shave off 19ms off in numpy deserialization and 9ms in PyBytes_FromStringAndSize (9ms is the duratio of a raw deepcopy on my computer).

@crusaderky
Copy link
Collaborator Author

crusaderky commented Dec 23, 2022

Worth noting that blosc (removed in #5269) does offer deserialization to already existing memory (albeit in an unpythonic and unsafe way).

@crusaderky
Copy link
Collaborator Author

This works, thanks to #5208:

import dask
import distributed
import numpy

c = distributed.Client(n_workers=2, memory_limit="8 GiB")
x = c.submit(numpy.random.random, 4 * 2**30 // 8, workers=[0])
y = c.submit(lambda x: None, x, workers=[1])

Replace numpy.random.random with numpy.ones, and the receiving worker will be killed by the nanny.

@crusaderky crusaderky changed the title Deserialization of compressed data is sluggish Deserialization of compressed data is sluggish and causes memory flares Dec 23, 2022
@crusaderky
Copy link
Collaborator Author

TL,DR; It's death by deep-copy. We're spending 85ms doing actual decompression and 49ms doing deep-copies.

Another important note:
The actual decompression part releases the GIL. So, as long as serialization/deserialization iss offloaded to another thread (#4424), it's fine.
To my understanding, the deep copies all hold the GIL while they are performed.

@mrocklin
Copy link
Member

I would be curious to see if turning off compression entirely affects the benchmark suite. That might give us a better holistic understanding of how much impact this has. Is that easy to test?

@crusaderky
Copy link
Collaborator Author

I would be curious to see if turning off compression entirely affects the benchmark suite. That might give us a better holistic understanding of how much impact this has. Is that easy to test?

Yes but a lot of data in the benchmark suite is created with numpy.random.random - which is uncompressible - so it's not necessarily (I'd dare say, unlikely) representative of real world data.

@crusaderky
Copy link
Collaborator Author

crusaderky commented Dec 24, 2022

I think the whole thing can be dropped from 143ms to 18ms. I think we were too hasty to drop support for blosc - the performance benefit is huge.

import pickle
from time import time
import blosc


def compress(frame):
    if isinstance(frame, pickle.PickleBuffer):
        mv = memoryview(frame)
        return blosc.compress(mv, clevel=5, cname="blosclz", typesize=mv.itemsize)
    else:
        return blosc.compress(frame, clevel=5, cname="blosclz")

# a = numpy.linspace(0, 1, 128 * 2**20 // 8)
a = numpy.ones(128 * 2**20 // 8)

t0 = time()
frames = []
pik = pickle.dumps(a, protocol=5, buffer_callback=frames.append)
t1 = time()
frames.insert(0, pik)
print([len(memoryview(frame)) for frame in frames])
cframes = [compress(frame) for frame in frames]
print([len(frame) for frame in cframes])
t2 = time()
frames2 = [blosc.decompress(frame) for frame in cframes]
t3 = time()
a2 = pickle.loads(frames2[0], buffers=frames2[1:])
t4 = time()
numpy.testing.assert_array_equal(a, a2)

print("dumps", t1 - t0)
print("compress", t2 - t1)
print("decompress", t3 - t2)
print("loads", t4 - t3)
[120, 16777216]
[136, 542224]
dumps 0.00045180320739746094
compress 0.011289596557617188
decompress 0.012519598007202148
loads 0.0037240982055664062

Same as above, but with lz4.block.compress / lz4.block.decompress:

[120, 16777216]
[121, 526366]
dumps 0.0005145072937011719
compress 0.010733366012573242
decompress 0.09740924835205078
loads 0.006391048431396484

linspace does not compress at all with lz4, while it compresses to 6% with blosc.


On a separate topic - what's the purpose of sharding? I can't see how it makes us avoid any memory spikes anyway, as we always have all the sharded frames in memory before we do anything with them?

@mrocklin
Copy link
Member

mrocklin commented Dec 24, 2022 via email

@crusaderky
Copy link
Collaborator Author

While it's true that we won't be able to see benefits of compressor we will be able to see the costs pretty clearly.

Actually, we won't. maybe_compress tries compressing 10 kiB and, if that compresses to more than 90%, it doesn't attempt to compress the rest. This exercise is quite negligible, CPU wise.

Sharding is important as data sizes grow to 2GB, when many algorithms break down.

Are you talking about the network stack?
Do you have any specific examples in mind? I'd like to discuss this more in depth (at our next stand-up?)
If it's a networking problem, we could move sharding downstream of compression - that alone would be greatly beneficial.

@mrocklin
Copy link
Member

Actually, we won't. maybe_compress tries compressing 10 kiB and, if that compresses to more than 90%, it doesn't attempt to compress the rest. This exercise is quite negligible, CPU wise.

Ah, good point

Do you have any specific examples in mind?

In [1]: import lz4.block

In [2]: lz4.block.compress(b'0' * int(3e9))
---------------------------------------------------------------------------
OverflowError                             Traceback (most recent call last)
Cell In [2], line 1
----> 1 lz4.block.compress(b'0' * int(3e9))

OverflowError: Input too large for LZ4 API

@crusaderky
Copy link
Collaborator Author

Then I think we should seriously reconsider blosc. Besides being 5x faster, it's also the only API that allows decompressing to an already existing bytearray. With sharding upstream of compression, that's the only way to avoid deep copies.

@crusaderky
Copy link
Collaborator Author

To recap the discussion: both the time spent and the memory flares are caused by the conjunction of the following:

  • The python lz4 library performs an unnecessary deep copy before it returns the array. Needs upstream PR.
  • decompression when unspilling is not offloaded out of the main thread, unlike decompression when receiving from the network
    (Asynchronous Disk Access in Workers #4424)
  • distributed used to perform a deep copy when unpickling a writeable numpy array from a read-only bytes buffer. Closed by Avoid deep copy on lz4 decompression #7437 for single-shard compressed data and by Avoid deep copy of numpy buffers on unspill #7435 for spilled uncompressed data.
  • lz4 is very slow to decompress, compared to blosc
  • sharded buffers are decompressed to separate, non-contiguous buffers which are necessarily deep-copied afterwards. To fix this, the compression library needs to support decompressing to an existing buffer. blosc already supports it; we could contribute a patch to lz4 to add the feature.

Additionally,

  • coiled_runtime CI for the most part uses uncompressible data, which does not show the problem.

@mrocklin
Copy link
Member

Then I think we should seriously reconsider blosc

I don't strongly object. If we add blosc again I think that we should try to add it in a normal way similar to other compressors, rather than the way that it was implemented before, as part of the serializer. This caused frequent complications.

lz4 is very slow to decompress, compared to blosc

My recollection is that it's faster than 1GB/s, and so probably very cheap relative to other things in the pipeline. "Slower than blosc" doesn't seem motivating to me. I think that the thing to beat here is "a common bottleneck in common workloads".

coiled_runtime CI for the most part uses uncompressible data, which does not show the problem

I agree that having benchmark results would be helpful in motivating the prioritization of this work. Right now the original motivation (dot-product with spill on compressible data) feels a little niche to me given the amount of effort. Of course, if you're having fun then don't let me stop you 🙂

@mrocklin
Copy link
Member

Of course, maybe this actually applies in lots of common cases (every time we move compressible data bigger than a few megabytes and every time we spill) and has been having a large but silent impact. If so, hooray 🎉. In general, +1 to benchmarking. I would hope that this would come up in some of the realistic benchmarks? Vorticitiy? Dataframe joins?

@crusaderky
Copy link
Collaborator Author

crusaderky commented Mar 15, 2023

lz4 is very slow to decompress, compared to blosc

My recollection is that it's faster than 1GB/s

400 MiB/s compression
760 MiB/s decompression (<= 64 MiB buffers)
490 MiB/s decompression (> 64 MiB buffers)

Which is, in fact, extremely noticeable - discussion and code on #7655.

This was referenced May 10, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants