Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to different, stable hash algorithm in Bag #6723

Closed
itamarst opened this issue Oct 9, 2020 · 14 comments · Fixed by #10734
Closed

Switch to different, stable hash algorithm in Bag #6723

itamarst opened this issue Oct 9, 2020 · 14 comments · Fixed by #10734
Labels

Comments

@itamarst
Copy link
Contributor

itamarst commented Oct 9, 2020

In #6640, it was pointed out that groupby() doesn't work on non-numerics. The issue: hash() is used to group, hash() gives different responses for different Python processes. The solution was to set a hash seed.

Unfortunately, Distributed has the same issue (dask/distributed#4141) and it's harder to solve. E.g. distributed-worker without a nanny doesn't have a good way to set a consistent seed.

Given that:

  1. Until now groupby was broken for non-numerics and no one noticed. So probably not a huge use case.
  2. It's hard to solve this with hash().

A better approach might be a different hash algorithm. For example, https://deepdiff.readthedocs.io/en/latest/deephash.html.

deephash seems promising:

from deepdiff import DeepHash
from pprint import pprint

class CustomObj:

    def __init__(self, x):
        self.x = x

    def __repr__(self):
        return f"CustomObj({self.x})"


objects = [
    125,
    "lalala",
    (12, 17),
    CustomObj(17),
    CustomObj(17),
    CustomObj(25),
]

for o in objects:
    print(repr(o), "has hash", DeepHash(o)[o])

Results in:

$ python example.py 
125 has hash 08823c686054787db6befb006d6846453fd5a3cbdaa8d1c4d2f0052a227ef204
'lalala' has hash 2307d4a08f50b743ec806101fe5fc4664e0b83c6c948647436f932ab38daadd3
(12, 17) has hash c7129efa5c7d19b0efabda722ae4cd8d022540b31e8160f9c181d163ce4f1bf6
CustomObj(17) has hash 477631cfc0315644152933579c23ed0c9f27743d632613e86eb7e271f6fc86e4
CustomObj(17) has hash 477631cfc0315644152933579c23ed0c9f27743d632613e86eb7e271f6fc86e4
CustomObj(25) has hash 8fa8427d493399d6aac3166331d78e9e2a2ba608c2b3c92a131ef0cf8882be14

Downsides:

  1. This hashing approach, serializing the object to a string and then hashing the string, is probably much more expensive than hash().
  2. Custom __hash__ won't work.

That being said, bag has used hash() this way for 4 years, maybe more, it's been broken with Distributed the whole time I assume, so probably that second downside is less relevant. Not sure about the performance aspect, though.

@mrocklin
Copy link
Member

cc @eriknw @jcrist who might have experience here

@itamarst
Copy link
Contributor Author

Another, faster approach: don't support arbitrary objects, just explicitly supported specific types. I.e. enums/int/float/string/tuple/bytes and presumably numpy numeric types. And probably 5 other types I'm forgetting about.

@jcrist
Copy link
Member

jcrist commented Oct 12, 2020

A different option would be to hijack the pickle infrastructure with a hash function. This has the benefit of supporting arbitrary python types as long as they're pickleable (and all types used as results in dask should be pickleable), while also being ok on perf for large objects. The overhead here is mostly on setup of the pickler per-call, so small objects (ints, small strings) would be slower while nested objects should be the same. There are faster hashing algorithms out there than those in hashlib (and we don't need a cryptographic hash here), but this was quick to write up.

In [16]: import cloudpickle, hashlib

In [17]: class HashFil:
    ...:     def __init__(self):
    ...:         self.hash = hashlib.sha1()
    ...:     def write(self, buf):
    ...:         self.hash.update(buf)
    ...:         return len(buf)
    ...:     def buffer_callback(self, buf):
    ...:         self.write(buf.raw())
    ...:

In [18]: def custom_hash(x):
    ...:     fil = HashFil()
    ...:     pickler = cloudpickle.CloudPickler(fil, protocol=5, buffer_callback=fil.buffer_callback)
    ...:     pickler.dump(x)
    ...:     return fil.hash.hexdigest()
    ...:

In [19]: %timeit custom_hash([1, 2, 3, "hello"])
3.56 µs ± 18.1 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

Perf could be further improved by special casing a few common types (str, int, float, bytes, ...) and having a fast path for those.

@itamarst
Copy link
Contributor Author

The worry about pickle is that some implementations might use non-deterministic iteration. E.g. if it's just iterating over an internal dict... and then we're back to the same problem, albeit in a much smaller set of cases.

@itamarst
Copy link
Contributor Author

Although... dict iteration is actually deterministic these days, isn't it. So maybe it's fine.

@jsignell jsignell added the bag label Oct 13, 2020
@itamarst
Copy link
Contributor Author

@jsignell
Copy link
Member

The joblib solution seems like a good one. There is also some discussion of hashing going on in #6844 and #6843.

@cisaacstern
Copy link
Contributor

cisaacstern commented Dec 9, 2023

Thanks all for your work on this so far! Reviving this thread as today @TheNeuralBit discovered that it is the root cause of apache/beam#29365 (thanks Brian!). To summarize the context there, and my motivations:

  1. Apache Beam is a framework for embarrassingly-parallel data processing, which flexibly deploys to various "runners"
  2. Pangeo Forge leverages Beam for cloud-optimization of scientific data (I am a core developer on this project)
  3. Beam can currently deploy to a minimal DaskRunner. This runner is implemented with Dask Bag, however, so the groupby issues discussed here prevent it from grouping with string or other non-numeric keys. 1
  4. From my perspective (along with many other Pangoe Forgeans), if this DaskRunner were to be developed into a fully-functional state, it would serve as an ideal onramp for supporting both (a) existing Dask users to run Beam pipelines; and (b) existing Beam users (of which there are a lot!) to become Dask cluster enthusiasts!2

So with that background out of the way, I'd love to re-open the discussion here as to what folks see as the most viable path forward:

?

I would gladly contribute to or lead a PR on this topic (or defer to someone else who is motivated to do so), but of course looks like the first step is developing a bit more consensus about the path forward.

P.S. @jacobtomlinson and I plan to present a bit on the DaskRunner at the upcoming Dask Demo Day, and I will update our draft presentation to include a mention of this issue.

Footnotes

  1. String keys are used in Beam's groupby tutorial, and other non-numeric keys are also used internally throughout Beam, so this is a non-negotiable if the DaskRunner is to mature into a fully-fledged Beam deployment target.

  2. Beam currently lacks any comparable easy-to-deploy, performant target for running Python-based pipelines on a single machine (their single-machine runner is explicitly described as being for testing data serialization only, and does not scale), HPC, or frankly even AWS. (The next-best AWS option is Apache Spark or Flink, but these come with significant, ahem Java, overhead and are not-trivial for even highly competent Pangeo/Python users to navigate.)

@itamarst
Copy link
Contributor Author

itamarst commented Dec 9, 2023

Notes on joblib.hashing

joblib implements the recursive python object traversal using Pickle with a special-case for NumPy: https://github.com/joblib/joblib/blob/master/joblib/hashing.py . Some problems:

Notes on the hash algorithm (separate from the traversal method)

joblib uses md5/sha and those are likely somewhat slower (they're cryptographic, which isn't necessary here), but also might be fast enough. There are hash algorithms specifically designed for stability across machines and time, often designed for very fast hashing larger amounts of data where the output will be used as a key in persistent storage or distributed systems. HighwayHash is a good example: https://github.com/google/highwayhash#versioning-and-stability ("input -> hash mapping will not change"); not sure if there's a seed but if so just need to set a fixed seed. There are other alternatives as well.

Another, perhaps unworkable option: convert Python hash() to a stable hash

Given the seed, which you can extract from CPython internals, it might be possible mathematically to take the output of hash(obj) and turn it into a stable value by mathematically undoing the impact of the seed.

Another option: a deterministic/canonical serialization format.

Some serialization formats are specifically designed to be deterministic on inputs, in order to allow for cryptographic hashing/signatures for comparison. E.g. https://borsh.io/ is one, with wrapper in Python at https://pypi.org/project/borsh-python/. Unfortunately it requires a schema... so you'd need to find a self-describing (i.e. messages don't require scheme) deterministic serialization format. Deterministic CBOR is in a draft spec, and has at least a Rust implementation: https://docs.rs/dcbor/latest/dcbor/ so that might be a possibility. For best performance what you really want is a streaming self-describing deterministic serializer, dcbor at least isn't streaming.

Given a deterministic serialization, you can then just use a hash function of your choice (md5, HighwayHash, whatever) to get the hash.

@TheNeuralBit
Copy link

TheNeuralBit commented Dec 9, 2023 via email

@cisaacstern
Copy link
Contributor

Finding a solution here is still worthwhile IMO, though will presumably unfold over a somewhat longer timescale.

In the meantime, please to report that dask/distributed#8400 does unblock many use cases, including the linked Beam issue.

@mrocklin
Copy link
Member

mrocklin commented Dec 16, 2023 via email

@cisaacstern
Copy link
Contributor

cisaacstern commented Dec 21, 2023

dask/distributed#8400 does unblock many use cases, including the linked Beam issue

I spoke a bit too soon here.

As described at the bottom of apache/beam#29802 (comment), while dask/distributed#8400 did resolve non-deterministic hashing of strings, I subsequently came to understand that it does not resolve non-deterministic hashing of None. hash(None) is deterministic in Python 3.12, but AFAICT there is no way to make it deterministic in Python < 3.12.

hash(None) being non-deterministic in < Python 3.12 means that any otherwise-hashable container type that contains None, will also hash non-deterministically between Python processes, and therefore be unusable as a key for Bag.groupby in a distributed context. This would include, in the simplest case, (None,), but also extends to frozen dataclasses, etc.

This happens to be the next blocker for me on my motivating Beam issue, but apart from that specific case, I believe this is sufficiently problematic as to warrant moving forward with a fix for this issue.

In terms of a specific proposal, I am curious about others' thoughts on leveraging Dask's existing tokenize function? This would seem to have many advantages:

There would also be an opportunity to fast-path types that are known to deterministically hash (e.g. ints). I'll open a draft PR now that sketches out this idea. Here's a draft PR for discussion purposes: #10734.

@cisaacstern
Copy link
Contributor

Ping for those following here, I've just opened my proposed fix for this issue, #10734, for review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants