Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(draft): add report= argument for uproot.dask; trigger report collection (take 2!) #1058

Merged
merged 22 commits into from
Dec 12, 2023

Conversation

douglasdavis
Copy link
Contributor

@douglasdavis douglasdavis commented Dec 7, 2023

Supersedes #1050

This again boils down to adding a report= argument, but now the implementation to actually build the report is significant changed and based on PR dask-contrib/dask-awkward#433 in dask-awkward instead of the combination of #1050 + dask-contrib/dask-awkward#415

@lgray if you could take this for a spin on something that you know will potentially have OSError raised, that would be great!

@douglasdavis
Copy link
Contributor Author

douglasdavis commented Dec 7, 2023

In this PR the failure and success reports are completely defined and used here in uproot, specifically in the __call__ implementations. There is no default provided by dask-awkward. dask-awkward just needs to know if the input layer is going to return two arrays- that is controlled with the return_report attribute on the io_func that gets passed to from_map. (on the dask-awkward side if from_map detects that the io_func it receives has io_func.return_report == True, it builds the layer as needed

As a POC I've made the fields

  • duration
  • exception
  • error message
  • args
  • kwargs

@lgray
Copy link
Contributor

lgray commented Dec 7, 2023

Here's a quick script that I've been using to check things!

from coffea.nanoevents import NanoEventsFactory, NanoAODSchema
#from distributed import Client
import dask
#import dask_awkward as dak


if __name__ == "__main__":
    #client = Client()


    #dask.config.set({"awkward.optimization.enabled": True, "awkward.raise-failed-meta": True, "awkward.optimization.on-fail": "raise"})
    events, report = NanoEventsFactory.from_root(
        ["https://github.com/CoffeaTeam/coffea/raw/master/tests/samples/nano_dy.root:Events", "/not/actually/a/root/file.root:Events"],
        metadata={"dataset": "nano_dy"},
        schemaclass=NanoAODSchema,
        uproot_options={"allow_read_errors_with_report": True}
    ).events()

    pt, creport = dask.compute(events.Muon.pt, report)

(edited to follow @jpivarski's review comments)

@lgray
Copy link
Contributor

lgray commented Dec 7, 2023

Though with the latest report and the above script I get:

(coffea-dev) lgray@Lindseys-MacBook-Pro coffea % python -i report_play.py
/Users/lgray/coffea-dev/coffea/src/coffea/nanoevents/schemas/nanoaod.py:243: RuntimeWarning: Missing cross-reference index for FatJet_genJetAK8Idx => GenJetAK8
  warnings.warn(
Traceback (most recent call last):
  File "report_play.py", line 12, in <module>
    events, report = NanoEventsFactory.from_root(
  File "/Users/lgray/coffea-dev/dask-awkward/src/dask_awkward/lib/core.py", line 1006, in __iter__
    raise NotImplementedError(
NotImplementedError: Iteration over a Dask Awkward collection is not supported.
A suggested alternative: define a function which iterates over
an awkward array and use that function with map_partitions.

Whereas the last report version completed just fine.

def time_it(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
start = time.time()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use time.monotonic() for start and stop!

Copy link
Contributor

@lgray lgray Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the time package has some odd aspects. time.time() can't be used with time.monotonic(), they measure from different starting points, the latter to ensure consistency.

So need time.time() for the start time since epoch and then a consistent duration is formed from pairs of time.monotonic().

Annoying, right?

@lgray
Copy link
Contributor

lgray commented Dec 7, 2023

The latest whoops commits gives me:

(coffea-dev) lgray@Lindseys-MacBook-Pro coffea % python -i report_play.py                                 
/Users/lgray/coffea-dev/coffea/src/coffea/nanoevents/schemas/nanoaod.py:243: RuntimeWarning: Missing cross-reference index for FatJet_genJetAK8Idx => GenJetAK8
  warnings.warn(
Traceback (most recent call last):
  File "/Users/lgray/coffea-dev/uproot5/src/uproot/source/file.py", line 114, in _open
    self._file = numpy.memmap(self._file_path, dtype=self._dtype, mode="r")
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/numpy/core/memmap.py", line 228, in __new__
    f_ctx = open(os_fspath(filename), ('r' if mode == 'c' else mode)+'b')
FileNotFoundError: [Errno 2] No such file or directory: '/not/actually/a/root/file.root'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/lgray/coffea-dev/uproot5/src/uproot/source/file.py", line 38, in __init__
    self._file = open(self._file_path, "rb")
FileNotFoundError: [Errno 2] No such file or directory: '/not/actually/a/root/file.root'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "report_play.py", line 19, in <module>
    pt, creport = dask.compute(events.Muon.pt, report)
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/base.py", line 599, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/threaded.py", line 89, in get
    results = get_async(
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/local.py", line 511, in get_async
    raise_exception(exc, tb)
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/local.py", line 319, in reraise
    raise exc
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/local.py", line 224, in execute_task
    result = _execute_task(task, data)
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/optimization.py", line 990, in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/core.py", line 149, in get
    result = _execute_task(task, cache)
  File "/Users/lgray/miniforge3/envs/coffea-dev/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/Users/lgray/coffea-dev/uproot5/src/uproot/_dask.py", line 1173, in __call__
    ttree = uproot._util.regularize_object_path(
  File "/Users/lgray/coffea-dev/uproot5/src/uproot/_util.py", line 1153, in regularize_object_path
    file = ReadOnlyFile(
  File "/Users/lgray/coffea-dev/uproot5/src/uproot/reading.py", line 588, in __init__
    self._source = Source(file_path, **self._options)
  File "/Users/lgray/coffea-dev/uproot5/src/uproot/source/file.py", line 110, in __init__
    self._open()
  File "/Users/lgray/coffea-dev/uproot5/src/uproot/source/file.py", line 120, in _open
    self._fallback = uproot.source.file.MultithreadedFileSource(
  File "/Users/lgray/coffea-dev/uproot5/src/uproot/source/file.py", line 253, in __init__
    self._open()
  File "/Users/lgray/coffea-dev/uproot5/src/uproot/source/file.py", line 258, in _open
    [FileResource(self._file_path) for x in range(self._num_workers)]
  File "/Users/lgray/coffea-dev/uproot5/src/uproot/source/file.py", line 258, in <listcomp>
    [FileResource(self._file_path) for x in range(self._num_workers)]
  File "/Users/lgray/coffea-dev/uproot5/src/uproot/source/file.py", line 40, in __init__
    raise uproot._util._file_not_found(file_path) from err
FileNotFoundError: file not found

    '/not/actually/a/root/file.root'

Files may be specified as:
   * str/bytes: relative or absolute filesystem path or URL, without any colons
         other than Windows drive letter or URL schema.
         Examples: "rel/file.root", "C:\abs\file.root", "http://where/what.root"
   * str/bytes: same with an object-within-ROOT path, separated by a colon.
         Example: "rel/file.root:tdirectory/ttree"
   * pathlib.Path: always interpreted as a filesystem path or URL only (no
         object-within-ROOT path), regardless of whether there are any colons.
         Examples: Path("rel:/file.root"), Path("/abs/path:stuff.root")

Functions that accept many files (uproot.iterate, etc.) also allow:
   * glob syntax in str/bytes and pathlib.Path.
         Examples: Path("rel/*.root"), "/abs/*.root:tdirectory/ttree"
   * dict: keys are filesystem paths, values are objects-within-ROOT paths.
         Example: {"/data_v1/*.root": "ttree_v1", "/data_v2/*.root": "ttree_v2"}
   * already-open TTree objects.
   * iterables of the above.

So somehow it's not catching the FileNotFound/OSError correctly?

@douglasdavis
Copy link
Contributor Author

I think the exception is getting raised before dask-awkward gets to the place where it allows exceptions (at the __call__) I think uproot is checking for a file existence before asking for any data (so before compute)?

I'm still digging/working on this!

@douglasdavis
Copy link
Contributor Author

Some good news here though:

In [11]: import uproot
    ...: import dask
    ...: files = ["/Users/ddavis/software/repos/coffea/tests/samples/nano_dy.root:Events"]
    ...: thing, report = uproot.dask(files, report=True)
    ...: a, b = dask.compute(thing, report)
    ...: 
    ...: 

In [12]: a
Out[12]: <Array [{run: 1, ...}, ..., {run: 1, ...}] type='40 * {run: uint32, luminos...'>

In [13]: b.tolist()
Out[13]: 
[{'duration': -1701769030.9317346,
  'args': ["<TTree 'Events' (1499 branches) at 0x000176803c10>", '0', '40'],
  'kwargs': [],
  'exception': None,
  'message': None,
  'fqdn': None,
  'hostname': None}]

@lgray
Copy link
Contributor

lgray commented Dec 7, 2023

I use open_files=False in uproot.dask so it shouldn't be trying to open anything!

@douglasdavis
Copy link
Contributor Author

douglasdavis commented Dec 7, 2023

Ah you're right. I didnt catch the whole traceback :) there is some uproot code getting used in __call__ before the exception check, just need to reorder some things

@lgray
Copy link
Contributor

lgray commented Dec 7, 2023

import uproot
import dask

if __name__ == "__main__":
    events, report = uproot.dask(
        ["tests/samples/nano_dy.root:Events", "/not/actually/a/root/file.root:Events"],
        allow_read_errors_with_report=True,
        open_files=False,
    )

    pt, creport = dask.compute(events.Muon_pt, report)

Here's a non-coffea script. (edit for comments from @jpivarski's review)

@douglasdavis
Copy link
Contributor Author

douglasdavis commented Dec 7, 2023

OK just got the coffea-using script to work. Only other Q I had is above -- about how the duration should be implemented I see you answered it! GH is appearing flaky for me 🤔

@lgray
Copy link
Contributor

lgray commented Dec 7, 2023

commented in that review thread but putting it here:

call_time = time.time_ns() # integer time since epoch, convertible to a datetime
start = time.monotonic() # since start of program for calibration purposes
call(stuff)
stop = time.monotonic()
return call_time, stop - start

@douglasdavis douglasdavis force-pushed the add-dask-awkward-from-map-report-2 branch from ad45af6 to 582119b Compare December 7, 2023 20:11
@lgray
Copy link
Contributor

lgray commented Dec 7, 2023

and, more or less, we care about call time if it fails and duration if it succeeds!

@lgray
Copy link
Contributor

lgray commented Dec 7, 2023

very nice:

(coffea-dev) lgray@Lindseys-MacBook-Pro coffea % python -i report_play.py                                 
/Users/lgray/coffea-dev/coffea/src/coffea/nanoevents/schemas/nanoaod.py:243: RuntimeWarning: Missing cross-reference index for FatJet_genJetAK8Idx => GenJetAK8
  warnings.warn(
>>> import pprint
>>> pprint.pprint(creport.to_list())
[{'args': ["'https://github.com/CoffeaTeam/coffea/raw/master/tests/samples/nano_dy.root'",
           "'Events'",
           '0',
           '1',
           'False'],
  'call_time': None,
  'duration': 1.389443333,
  'exception': None,
  'fqdn': None,
  'hostname': None,
  'kwargs': [],
  'message': None},
 {'args': ["'/not/actually/a/root/file.root'", "'Events'", '0', '1', 'False'],
  'call_time': 1701980886668361000,
  'duration': None,
  'exception': 'FileNotFoundError',
  'fqdn': '1.0.0.127.in-addr.arpa',
  'hostname': 'Lindseys-MacBook-Pro.local',
  'kwargs': [],
  'message': 'file not found\n'
             '\n'
             "    '/not/actually/a/root/file.root'\n"
             '\n'
             'Files may be specified as:\n'
             '   * str/bytes: relative or absolute filesystem path or URL, '
             'without any colons\n'
             '         other than Windows drive letter or URL schema.\n'
             '         Examples: "rel/file.root", "C:\\abs\\file.root", '
             '"http://where/what.root"\n'
             '   * str/bytes: same with an object-within-ROOT path, separated '
             'by a colon.\n'
             '         Example: "rel/file.root:tdirectory/ttree"\n'
             '   * pathlib.Path: always interpreted as a filesystem path or '
             'URL only (no\n'
             '         object-within-ROOT path), regardless of whether there '
             'are any colons.\n'
             '         Examples: Path("rel:/file.root"), '
             'Path("/abs/path:stuff.root")\n'
             '\n'
             'Functions that accept many files (uproot.iterate, etc.) also '
             'allow:\n'
             '   * glob syntax in str/bytes and pathlib.Path.\n'
             '         Examples: Path("rel/*.root"), '
             '"/abs/*.root:tdirectory/ttree"\n'
             '   * dict: keys are filesystem paths, values are '
             'objects-within-ROOT paths.\n'
             '         Example: {"/data_v1/*.root": "ttree_v1", '
             '"/data_v2/*.root": "ttree_v2"}\n'
             '   * already-open TTree objects.\n'
             '   * iterables of the above.\n'}]
>>> from datetime import datetime
>>> datetime.utcfromtimestamp(creport.to_list()[1]["call_time"]/1e9).strftime('%Y-%m-%d %H:%M:%S')
'2023-12-07 20:28:06'

@douglasdavis
Copy link
Contributor Author

OK I think this is in a pretty good place now

@lgray
Copy link
Contributor

lgray commented Dec 7, 2023

Yep - this seems to be in the right place!

@douglasdavis
Copy link
Contributor Author

Need to polish the PR in dask-awkward and that'll be in the next dask-awkward release

@lgray
Copy link
Contributor

lgray commented Dec 8, 2023

@btovar could you try this with taskvine to make sure the report works with very different backends?

@btovar
Copy link
Contributor

btovar commented Dec 8, 2023

@lgray will do!

@jpivarski jpivarski added the next-release Required for the next release label Dec 8, 2023
@jpivarski jpivarski marked this pull request as ready for review December 8, 2023 15:27
Copy link
Member

@jpivarski jpivarski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@douglasdavis gave me a walkthrough of this PR in our meeting, and I think it looks good, with just a few changes before merging.

  • For the uproot.dask user, let's change the name from report: bool = False to allow_read_errors_with_report: bool = False. Internally, the argument can be passed around as report: bool. This is a long argument name, but I think that extra context is needed, and not just in a docstring. This makes user scripts more self-documenting.
  • The args and kwargs fields in the report are too generic: the arguments in question here are internal within Uproot, but "args and kwargs" suggests that they're user-visible arguments, which can be confusing. They're generic arguments to dask-awkward, but in Uproot, when making the report, they can be named file_path, object_path, entry_start, entry_stop, etc., which are the user-visible names for these things.

Other than that, we're just waiting for @btovar's test, and then this is done and can be merged. It will go into 5.2.0 on Wednesday.

@btovar
Copy link
Contributor

btovar commented Dec 8, 2023

Currently I'm getting:

NotImplementedError: Iteration over a Dask Awkward collection is not supported.
A suggested alternative: define a function which iterates over
an awkward array and use that function with map_partitions.

versions: awkward-2.5.1rc1 coffea-2023.12.0rc0 dask-awkward-2023.12.0
uproot commit: cbad36e

I get that error even with the example above without coffea. Let me try with an env just with this uproot commit.

import uproot
import dask

if __name__ == "__main__":
    events, report = uproot.dask(
        ["tests/samples/nano_dy.root:Events", "/not/actually/a/root/file.root:Events"],
        report=True,
        open_files=False,
    )

    pt, creport = dask.compute(events.Muon_pt, report)

@douglasdavis
Copy link
Contributor Author

You'll need to checkout the branch @ dask-contrib/dask-awkward#433

@btovar
Copy link
Contributor

btovar commented Dec 8, 2023

I'm still getting the same error from a clean env. I'm checking out the given commits for uproot5 (cbad36e) and dask-awkward (d64f8a5aacf1f995c7aad5a380b9b7e406fd0194), and doing a pip install -e . in a clean env. This is the output of pip list:

Package            Version
------------------ -----------------------
awkward            2.5.1rc1
awkward-cpp        26
click              8.1.7
cloudpickle        3.0.0
dask               2023.12.0
dask-awkward       2023.12.1.dev4+gd64f8a5
fsspec             2023.12.1
importlib-metadata 7.0.0
locket             1.0.0
numpy              1.26.2
packaging          23.2
partd              1.4.1
pip                23.3.1
PyYAML             6.0.1
setuptools         68.2.2
toolz              0.12.0
typing_extensions  4.8.0
uproot             5.1.2
wheel              0.42.0
zipp               3.17.0

@douglasdavis
Copy link
Contributor Author

douglasdavis commented Dec 8, 2023

Ah, latest uproot commit requires

- report=True,
+ allow_read_errors_with_report=True,

in uproot.dask.

So coffea would need from_root(..., uproot_options={"allow_read_errors_with_report": True})

@lgray
Copy link
Contributor

lgray commented Dec 8, 2023

Yeah my bad for not updating the stuff in the comments above, have been traveling today - done now.

@btovar
Copy link
Contributor

btovar commented Dec 9, 2023

Thanks for your help, it seems to be working now. I tried both with local and remote taskvine workers.

@douglasdavis
Copy link
Contributor Author

I've added a simple test and the upstream feature is now available in the latest version of dask-awkward (2023.12.1), so I don't plan to add anything unless something needs polishing or is missing upon review

@lobis lobis self-requested a review December 12, 2023 00:34
@lobis lobis merged commit 66d1feb into scikit-hep:main Dec 12, 2023
20 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
next-release Required for the next release
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants