Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

selecting a point from an mfdataset #1396

Closed
rabernat opened this issue May 2, 2017 · 12 comments
Closed

selecting a point from an mfdataset #1396

rabernat opened this issue May 2, 2017 · 12 comments

Comments

@rabernat
Copy link
Contributor

rabernat commented May 2, 2017

Sorry to be opening so many vague performance issues. I am really having a hard time with my current dataset, which is exposing certain limitations of xarray and dask in a way none of my previous work has done.

I have a directory full of netCDF4 files. There are 1754 files, each 8.1GB in size, each representing a single model timestep. So there is ~14 TB of data total. (In addition to the time-dependent output, there is a single file with information about the grid.)

Imagine I want to extract a timeseries from a single point (indexed by k, j, i) in this simulation. Without xarray, I would do something like this:

import netCDF4
ts = np.zeros(len(all_files))
for n, fname in enumerate(tqdm(all_files)):
    nc = netCDF4.Dataset(fname)
    ts[n] = nc.variables['Salt'][k, j, i]
    nc.close()

Which goes reasonably quick: tqdm gives [02:38<00:00, 11.56it/s].

I could do the same sort of loop using xarray:

import xarray as xr
ts = np.zeros(len(all_files))
for n, fname in enumerate(tqdm(all_files)):
    ds = xr.open_dataset(fname)
    ts[n] = ds['Salt'][k, j, i]
    ds.close()

Which has a <50% performance overhead: [03:29<00:00, 8.74it/s]. Totally acceptable.

Of course, what I really want is to avoid a loop and deal with the whole dataset as a single self-contained object.

ds = xr.open_mfdataset(all_files, decode_cf=False, autoclose=True)

This alone takes between 4-5 minutes to run (see #1385). If I want to print the repr, it takes another 3 minutes or so to print(ds). The full dataset looks like this:

<xarray.Dataset>
Dimensions:   (i: 2160, i_g: 2160, j: 2160, j_g: 2160, k: 90, k_l: 90, k_p1: 91, k_u: 90, time: 1752)
Coordinates:
  * j         (j) float64 0.0 1.0 2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0 10.0 11.0 ...
  * k         (k) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 ...
  * j_g       (j_g) float64 0.0 1.0 2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0 10.0 ...
  * i         (i) int64 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 ...
  * k_p1      (k_p1) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 ...
  * k_u       (k_u) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 ...
  * i_g       (i_g) int64 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 ...
  * k_l       (k_l) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 ...
  * time      (time) float64 2.592e+05 2.628e+05 2.664e+05 2.7e+05 2.736e+05 ...
Data variables:
    face      (time) int64 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 ...
    PhiBot    (time, j, i) float32 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ...
    oceQnet   (time, j, i) float32 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ...
    SIvice    (time, j_g, i) float32 0.0516454 0.0523205 0.0308559 ...
    SIhsalt   (time, j, i) float32 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ...
    oceFWflx  (time, j, i) float32 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ...
    V         (time, k, j_g, i) float32 0.0491903 0.0496442 0.0276739 ...
    iter      (time) int64 10368 10512 10656 10800 10944 11088 11232 11376 ...
    oceQsw    (time, j, i) float32 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ...
    oceTAUY   (time, j_g, i) float32 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ...
    Theta     (time, k, j, i) float32 -1.31868 -1.27825 -1.21401 -1.17964 ...
    SIhsnow   (time, j, i) float32 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ...
    U         (time, k, j, i_g) float32 0.0281392 0.0203967 0.0075199 ...
    SIheff    (time, j, i) float32 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ...
    SIuice    (time, j, i_g) float32 -0.041163 -0.0487612 -0.0614498 ...
    SIarea    (time, j, i) float32 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ...
    Salt      (time, k, j, i) float32 33.7534 33.7652 33.7755 33.7723 ...
    oceSflux  (time, j, i) float32 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ...
    W         (time, k_l, j, i) float32 -2.27453e-05 -2.28018e-05 ...
    oceTAUX   (time, j, i_g) float32 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ...
    Eta       (time, j, i) float32 -1.28886 -1.28811 -1.2871 -1.28567 ...
    YC        (j, i) float32 -57.001 -57.001 -57.001 -57.001 -57.001 -57.001 ...
    YG        (j_g, i_g) float32 -57.0066 -57.0066 -57.0066 -57.0066 ...
    XC        (j, i) float32 -15.4896 -15.4688 -15.4479 -15.4271 -15.4062 ...
    XG        (j_g, i_g) float32 -15.5 -15.4792 -15.4583 -15.4375 -15.4167 ...
    Zp1       (k_p1) float32 0.0 -1.0 -2.14 -3.44 -4.93 -6.63 -8.56 -10.76 ...
    Z         (k) float32 -0.5 -1.57 -2.79 -4.185 -5.78 -7.595 -9.66 -12.01 ...
    Zl        (k_l) float32 0.0 -1.0 -2.14 -3.44 -4.93 -6.63 -8.56 -10.76 ...
    Zu        (k_u) float32 -1.0 -2.14 -3.44 -4.93 -6.63 -8.56 -10.76 -13.26 ...
    rA        (j, i) float32 1.5528e+06 1.5528e+06 1.5528e+06 1.5528e+06 ...
    rAw       (j, i_g) float32 1.5528e+06 1.5528e+06 1.5528e+06 1.5528e+06 ...
    rAs       (j_g, i) float32 9.96921e+36 9.96921e+36 9.96921e+36 ...
    rAz       (j_g, i_g) float32 1.55245e+06 1.55245e+06 1.55245e+06 ...
    dxG       (j_g, i) float32 1261.27 1261.27 1261.27 1261.27 1261.27 ...
    dyG       (j, i_g) float32 1230.96 1230.96 1230.96 1230.96 1230.96 ...
    dxC       (j, i_g) float32 1261.46 1261.46 1261.46 1261.46 1261.46 ...
    Depth     (j, i) float32 4578.67 4611.09 4647.6 4674.88 4766.75 4782.64 ...
    dyC       (j_g, i) float32 1230.86 1230.86 1230.86 1230.86 1230.86 ...
    PHrefF    (k_p1) float32 0.0 9.81 20.9934 33.7464 48.3633 65.0403 ...
    drF       (k) float32 1.0 1.14 1.3 1.49 1.7 1.93 2.2 2.5 2.84 3.21 3.63 ...
    PHrefC    (k) float32 4.905 15.4017 27.3699 41.0549 56.7018 74.507 ...
    drC       (k_p1) float32 0.5 1.07 1.22 1.395 1.595 1.815 2.065 2.35 2.67 ...
    hFacW     (k, j, i_g) float32 1.0 1.0 1.0 1.0 1.0 1.0 1.0 1.0 1.0 1.0 ...
    hFacS     (k, j_g, i) float32 1.0 1.0 1.0 1.0 1.0 1.0 1.0 1.0 1.0 1.0 ...
    hFacC     (k, j, i) float32 1.0 1.0 1.0 1.0 1.0 1.0 1.0 1.0 1.0 1.0 1.0 ...
Attributes:
    coordinates:  face

Now, to extract the same timeseries, I would like to say

ts = ds.Salt[:, k, j, i].load()

I monitor what is happening under the hood using when I call this by using netdata and the dask.distributed dashboard, using only a single process and thread. First, all the files are opened (see #1394). Then they start getting read. Each read takes between 10 and 30 seconds, and the memory usage starts increasing steadily. My impression is that the entire dataset is being read into memory for concatenation. (I have dumped out the dask graph in case anyone can make sense of it.) I have never let this calculation complete, as it looks like it would eat up all the memory on my system...plus it's extremely slow.

To me, this seems like a failure of lazy indexing. I naively expected that the underlying file access would work similar to my loop, perhaps even in parallel.

Can anyone shed some light on what might be going wrong?

@shoyer
Copy link
Member

shoyer commented May 2, 2017

Can you try using Ctrl + C to interrupt things and report the stack-trace?

This might be an issue with xarray verifying aligned coordinates, which we should have an option to disable. (This came up somewhere else recently, but I couldn't find the issue with a quick search.)

@rabernat
Copy link
Contributor Author

rabernat commented May 2, 2017

The relevant part of the stack trace is

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/xarray/core/dataarray.py in load(self)
    571         working with many file objects on disk.
    572         """
--> 573         ds = self._to_temp_dataset().load()
    574         new = self._from_temp_dataset(ds)
    575         self._variable = new._variable

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/xarray/core/dataset.py in load(self)
    467 
    468             # evaluate all the dask arrays simultaneously
--> 469             evaluated_data = da.compute(*lazy_data.values())
    470 
    471             for k, data in zip(lazy_data, evaluated_data):

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/dask/base.py in compute(*args, **kwargs)
    200     dsk = collections_to_dsk(variables, optimize_graph, **kwargs)
    201     keys = [var._keys() for var in variables]
--> 202     results = get(dsk, keys, **kwargs)
    203 
    204     results_iter = iter(results)

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, **kwargs)
   1523         return sync(self.loop, self._get, dsk, keys, restrictions=restrictions,
   1524                     loose_restrictions=loose_restrictions,
-> 1525                     resources=resources)
   1526 
   1527     def _optimize_insert_futures(self, dsk, keys):

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    200     loop.add_callback(f)
    201     while not e.is_set():
--> 202         e.wait(1000000)
    203     if error[0]:
    204         six.reraise(*error[0])

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/threading.py in wait(self, timeout)
    547             signaled = self._flag
    548             if not signaled:
--> 549                 signaled = self._cond.wait(timeout)
    550             return signaled
    551 

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/threading.py in wait(self, timeout)
    295             else:
    296                 if timeout > 0:
--> 297                     gotit = waiter.acquire(True, timeout)
    298                 else:
    299                     gotit = waiter.acquire(False)

KeyboardInterrupt: 

I think the issue you are referring to is also mine (#1385).

@shoyer
Copy link
Member

shoyer commented May 2, 2017

OK, so that isn't terribly useful -- the slow-down is somewhere in dask-land. If it was an issue with alignment, that would come up when building the dask graph, not computing it.

@shoyer
Copy link
Member

shoyer commented May 2, 2017

One thing worth trying is specifying chunks manually in open_mfdataset. Point-wise indexing should not really require chunks specified ahead of time, but the optimizations dask uses to make these operations efficient are somewhat fragile, so dask may be loading full arrays to do this computation.

@rabernat
Copy link
Contributor Author

rabernat commented May 2, 2017

dask may be loading full arrays to do this computation

This is definitely what I suspect is happening. The problem with adding more chunks is that I quickly hit my system ulimit (see #1394), since, for some reason, all the 1754 files are opened as soon as I call .load(). Putting more chunks in just multiplies that number.

@rabernat
Copy link
Contributor Author

I have created a self-contained, reproducible example of this serious performance problem.
https://gist.github.com/rabernat/7175328ee04a3167fa4dede1821964c6

This issue is becoming a big problem for me. I imagine other people must be experiencing it too.

I am happy to try to dig in and fix it, but I think some of @shoyer's backend insight would be valuable first.

@rabernat
Copy link
Contributor Author

This dask bug also explains why it is so slow to generate the repr for these big datasets.

@JanisGailis
Copy link

JanisGailis commented Jun 7, 2017

We had similar performance issues with xarray+dask, which we solved by using a chunking heuristic when opening a dataset. You can read about it in #1440. Now, in our case the data really wouldn't fit in memory, which is clearly not the case in your gist. Anyway, I thought I'd play around with your gist and see if chunking can make a difference.

I couldn't use your example directly, as the data it generates in memory is too large for the dev VM I'm on with this. So I changed the generated file size to (12, 1000, 2000), the essence of your gist remained though, it would take ~25 seconds to do the time series extraction, whereas ~800 ms using extract_point_xarray().

So, I thought I'd try our 'chunking heuristic' on the generated test datasets. Simply split the dataset in 2x2 chunks along spatial dimensions. So:

ds = xr.open_mfdataset(all_files, decode_cf=False, chunks={'time':12, 'x':1000, 'y':500})

To my surprise:

# time extracting a timeseries of a single point
y, x = 200, 300
with ProgressBar():
    %time ts = ds.data[:, y, x].load()

results in

[########################################] | 100% Completed |  0.7s
CPU times: user 124 ms, sys: 268 ms, total: 392 ms
Wall time: 826 ms

I'm not entirely sure what's happening, as the file obviously fits in memory just fine because the looping thing works well. Maybe it's fine when you loop through them one by one, but the single file chunk turns out to be too large when dask wants to parallelize the whole thing. I really have no idea.

I'd be very intrigued to see if you can get a similar result by doing a simple 2x2xtime chunking. By the way, chunks={'x':1000, 'y':500, 'time':1} produces similar results with some overhead. Extraction took ~1.5 seconds.

EDIT:

print(xr.__version__)
print(dask.__version__)
0.9.5
0.14.1

@rabernat
Copy link
Contributor Author

rabernat commented Jun 7, 2017

Hi @JanisGailis. Thanks for looking into this issue! I will give your solution a try as soon as I get some free time.

However, I would like to point out that the issue is completely resolved by dask/dask#2364. So this can probably be closed after the next dask release.

@JanisGailis
Copy link

JanisGailis commented Jun 7, 2017

That's great to know! I think there's no need to try my 'solution' then, maybe only out of pure interest.

It would of course be interesting to know why a 'custom' chunked dataset was apparently not affected by the bug. And if it was indeed the case.

EDIT: I read the discussion on dask github and the xarray mailinglist. It's probably because when explicit chunking is used, the chunks are not aliased and fusing works as expected.

@rohitshukla0104

This comment has been minimized.

@jhamman
Copy link
Member

jhamman commented Jan 13, 2019

closed via dask/dask#2364 (a long time ago)

@jhamman jhamman closed this as completed Jan 13, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants