Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

With dask cluster uproot.dask breaks #1102

Closed
ivukotic opened this issue Jan 25, 2024 · 7 comments · Fixed by #1103
Closed

With dask cluster uproot.dask breaks #1102

ivukotic opened this issue Jan 25, 2024 · 7 comments · Fixed by #1103
Labels
bug The problem described is something that must be fixed

Comments

@ivukotic
Copy link

I can get a few branches read in using uproot.dask no problem. As long as I don't have a dask cluster. With a distributed dask cluster it breaks. Here the simplest reproducible example. File is already cached so no need for authentication.

from dask.distributed import Client
client = Client()

import dask_awkward as dak
import uproot

def my_name_filter(name):
    return name in [
        "AnalysisElectronsAuxDyn.pt",
        "AnalysisElectronsAuxDyn.eta",
        "AnalysisElectronsAuxDyn.phi",
        "AnalysisElectronsAuxDyn.m",
    ]

tree = uproot.dask(
[{'root://xcache.af.uchicago.edu:1094//root://fax.mwt2.org:1094//pnfs/uchicago.edu/atlaslocalgroupdisk/rucio/mc20_13TeV/c3/e9/DAOD_PHYSLITE.34869306._000001.pool.root.1': 'CollectionTree'}],
    filter_name=my_name_filter
)

ak_arr = tree.compute()
ak_arr.show()

Here error:

2024-01-25 20:16:57,382 - distributed.protocol.pickle - ERROR - Failed to serialize <ToPickle: HighLevelGraph with 1 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7fc1c118b550>
 0. from-uproot-8587817a131b75a5ffe8abc755185aa3
>.
Traceback (most recent call last):
  File "/venv/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 63, in dumps
    result = pickle.dumps(x, **dump_kwargs)
TypeError: cannot pickle '_thread.lock' object

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/venv/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 68, in dumps
    pickler.dump(x)
TypeError: cannot pickle '_thread.lock' object

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/venv/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 81, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
  File "/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle.py", line 1479, in dumps
    cp.dump(obj)
  File "/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle.py", line 1245, in dump
    return super().dump(obj)
TypeError: cannot pickle '_thread.lock' object
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
File /venv/lib/python3.9/site-packages/distributed/protocol/pickle.py:63, in dumps(x, buffer_callback, protocol)
     62 try:
---> 63     result = pickle.dumps(x, **dump_kwargs)
     64 except Exception:

TypeError: cannot pickle '_thread.lock' object

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
File /venv/lib/python3.9/site-packages/distributed/protocol/pickle.py:68, in dumps(x, buffer_callback, protocol)
     67 buffers.clear()
---> 68 pickler.dump(x)
     69 result = f.getvalue()

TypeError: cannot pickle '_thread.lock' object

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
File /venv/lib/python3.9/site-packages/distributed/protocol/serialize.py:353, in serialize(x, serializers, on_error, context, iterate_collection)
    352 try:
--> 353     header, frames = dumps(x, context=context) if wants_context else dumps(x)
    354     header["serializer"] = name

File /venv/lib/python3.9/site-packages/distributed/protocol/serialize.py:76, in pickle_dumps(x, context)
     74     writeable.append(not f.readonly)
---> 76 frames[0] = pickle.dumps(
     77     x,
     78     buffer_callback=buffer_callback,
     79     protocol=context.get("pickle-protocol", None) if context else None,
     80 )
     81 header = {
     82     "serializer": "pickle",
     83     "writeable": tuple(writeable),
     84 }

File /venv/lib/python3.9/site-packages/distributed/protocol/pickle.py:81, in dumps(x, buffer_callback, protocol)
     80     buffers.clear()
---> 81     result = cloudpickle.dumps(x, **dump_kwargs)
     82 except Exception:

File /venv/lib/python3.9/site-packages/cloudpickle/cloudpickle.py:1479, in dumps(obj, protocol, buffer_callback)
   1478 cp = Pickler(file, protocol=protocol, buffer_callback=buffer_callback)
-> 1479 cp.dump(obj)
   1480 return file.getvalue()

File /venv/lib/python3.9/site-packages/cloudpickle/cloudpickle.py:1245, in Pickler.dump(self, obj)
   1244 try:
-> 1245     return super().dump(obj)
   1246 except RuntimeError as e:

TypeError: cannot pickle '_thread.lock' object

The above exception was the direct cause of the following exception:

TypeError                                 Traceback (most recent call last)
Cell In[3], line 16
      3     return name in [
      4         "AnalysisElectronsAuxDyn.pt",
      5         "AnalysisElectronsAuxDyn.eta",
      6         "AnalysisElectronsAuxDyn.phi",
      7         "AnalysisElectronsAuxDyn.m",
      8     ]
     10 tree = uproot.dask(
     11     [{'root://xcache.af.uchicago.edu:1094//root://fax.mwt2.org:1094//pnfs/uchicago.edu/atlaslocalgroupdisk/rucio/mc20_13TeV/c3/e9/DAOD_PHYSLITE.34869306._000001.pool.root.1': 'CollectionTree'}],
     12     filter_name=my_name_filter,
     13     # library='ak'
     14 )
---> 16 ak_arr = tree.compute()
     17 ak_arr.show()

File /venv/lib/python3.9/site-packages/dask/base.py:342, in DaskMethodsMixin.compute(self, **kwargs)
    318 def compute(self, **kwargs):
    319     """Compute this dask collection
    320 
    321     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    340     dask.compute
    341     """
--> 342     (result,) = compute(self, traverse=False, **kwargs)
    343     return result

File /venv/lib/python3.9/site-packages/dask/base.py:628, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    625     postcomputes.append(x.__dask_postcompute__())
    627 with shorten_traceback():
--> 628     results = schedule(dsk, keys, **kwargs)
    630 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /venv/lib/python3.9/site-packages/distributed/protocol/serialize.py:379, in serialize(x, serializers, on_error, context, iterate_collection)
    377     except Exception:
    378         raise TypeError(msg) from exc
--> 379     raise TypeError(msg, str_x) from exc
    380 else:  # pragma: nocover
    381     raise ValueError(f"{on_error=}; expected 'message' or 'raise'")

TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x7fc1c118b550>\n 0. from-uproot-8587817a131b75a5ffe8abc755185aa3\n>')
@ivukotic ivukotic added the bug (unverified) The problem described would be a bug, but needs to be triaged label Jan 25, 2024
@jpivarski
Copy link
Member

@martindurant (and @douglasdavis), should this be a dask-awkward issue? (I don't have permissions to transfer it.)

@martindurant
Copy link

It says that the graph contains a lock object. That must have been introduced in the from_map call that I think uproot uses - perhaps proxying an open file? Printing out the details within the graph or translating it to a low-level-graph might help.

@jpivarski
Copy link
Member

It says that the graph contains a lock object.

Uh oh, I missed that. (I was looking at the bottom of the stack trace.) I'll figure out what that lock is for.

@martindurant
Copy link

I wish we had a better tool to tell you where in the object the unpicklable thing lives

@jpivarski
Copy link
Member

There's no issue in a random ROOT file,

>>> import uproot, skhep_testdata
>>> from dask.distributed import Client
>>> client = Client()
>>> 
>>> a = uproot.dask({skhep_testdata.data_path("uproot-HZZ.root"): "events"})
>>> a.compute()
<Array [{NJet: 0, Jet_Px: [], ...}, ..., {...}] type='2421 * {NJet: int32, ...'>

so it must be something special in DAOD_PHYSLITE. Yes, it is.

@jpivarski
Copy link
Member

jpivarski commented Jan 25, 2024

It's this one:

self._embedded_baskets_lock = threading.Lock()

The thing that's special about this file (these branches, to be specific) is that it has incompletely written ("embedded") TBaskets, which have to be read single-threaded because reading it changes the TBranch object.

I'll adapt the __setstate__/__getstate__ to drop and recreate the lock on pickling.

@jpivarski jpivarski linked a pull request Jan 25, 2024 that will close this issue
@jpivarski
Copy link
Member

PR #1103 fixes this.

@jpivarski jpivarski added bug The problem described is something that must be fixed and removed bug (unverified) The problem described would be a bug, but needs to be triaged labels Jan 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug The problem described is something that must be fixed
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants