Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using kerchunk to reference large sets of netcdf4 files #240

Closed
dougiesquire opened this issue Oct 20, 2022 · 11 comments
Closed

Using kerchunk to reference large sets of netcdf4 files #240

dougiesquire opened this issue Oct 20, 2022 · 11 comments

Comments

@dougiesquire
Copy link
Contributor

dougiesquire commented Oct 20, 2022

Firstly, thanks for this great tool!

I’m trying to generate a kerchunk reference dataset for many chunked netcdf4 files that comprise a single climate model experiment. I’m doing this on a local file system, not in a cloud environment, similar to #123.

The climate model experiment in question comprises 2TB across 61 netcdf files (unfortunately I can’t share these data). I generate a single reference json using the approach provided in the tutorial (code below). This all works well, and I can open my combined dataset using xarray.open_dataset and see that it has the correct structure and chunking.

However, when I try to perform a small compute on a variable in this dataset using a dask distributed cluster (with 4GB per worker) I immediately run out of memory. My reference json is ~1GB. Is this being loaded by each worker? I am confused because there are examples in the docs of this approach being applied to 80TB datasets. However, based on my simple example, I would’ve thought that the reference json(s) for an 80TB dataset would be prohibitively large. Am I doing something wrong/misunderstanding? Any advice would be much appreciated.

import fsspec
import ujson
import dask
from distributed import Client
from kerchunk.hdf import SingleHdf5ToZarr
from kerchunk.combine import MultiZarrToZarr


client = Client(n_workers=4)
print(f"Dask dashboard at: {client.dashboard_link}")

# Write single file jsons in parallel

fs = fsspec.filesystem('file')
flist = fs.glob(f"experiment1/ncfile*.nc")

@dask.delayed
def gen_json(file):
    with fs.open(file) as infile:
        h5chunks = SingleHdf5ToZarr(infile, file)
        outf = f"{file.split('/')[-1]}.json"
        with open(outf, 'wb') as f:
            f.write(ujson.dumps(h5chunks.translate()).encode());
    
dask.compute(*[gen_json(file) for file in flist])

# Combine into multifile json

json_list = fs.glob("./ncfile*.nc.json")

mzz = MultiZarrToZarr(
    json_list,
    concat_dims=['time'],
)

d = mzz.translate("experiment1.json")

# Open the reference dataset

m = fsspec.get_mapper(
    'reference://', 
    fo="experiment1.json", 
    remote_protocol="file"
)
ds = xr.open_dataset(
    m,
    engine='zarr', 
    backend_kwargs={"consolidated": False},
    chunks={},
    decode_times=False
)

# Performing a simple compute task on a single variable of ds uses large amounts of memory

ds["variable"].mean(["lon", "lat"]).compute()
@martindurant
Copy link
Member

The total number of chunks in the target matters, as (for now) we are storing the references as inefficient JSON and loading them into python objects. If your JSON is 1GB, I assume you must truly have a huge number of references. Yes, this will be loaded by every worker; I am not sure (to be tested) whether the file location or the in-memory reference set is being passed.

It is plausible that we could provide a way to only pass only those references to a worker which we know from the graph it will be needing. In the near future, we mean to revisit the on-disc and in-memory data structure for the reference set, and implement laziness, so that references are loaded batch by batch as required.

If you omit the distributed client, you will reuse the original memory footprint of the references and, since the compute is GIL-releasing, still get parallelism. That's my recommendation for this particular compute.

Another thing worth considering, is the sheer size of the compute graph itself. It is possible, and efficient, to use chunks to create dask partitions that are a (large) multiple of the base chunksize. This greatly cuts down the number of tasks in the graph, and might solve thew problem by itself.

@dougiesquire
Copy link
Contributor Author

Thanks very much for this helpful response, @martindurant!

I've explored omitting the distributed client and rechunking, but my kerchunk reference dataset is always slower to compute on than if I just open the equivalent dataset with xarray.open_mfdataset, I guess because of the additional overhead of my large json file (direct comparison in this notebook, if you're interested).

Lazy loading of the reference set sounds awesome!

@martindurant
Copy link
Member

It would be interesting to get a profile for cell 4 - is all the time spent in json decoding (see #241 )? Similarly, the distributed client will tell you what the workers are busy doing (you get get a profile report or just look on the dashboard).

I am not familiar, what does chunks={"time": -1} do? I had meant that, assuming the inherent time chunking is 1, using chunks={"time": 10}.

@rsignell-usgs
Copy link
Collaborator

rsignell-usgs commented Oct 21, 2022

chunks={'time': -1} sets the chunk size for time to the full length of the time variable.

@dougiesquire
Copy link
Contributor Author

I presume you mean this cell 4 (apologies, there were a few cells labelled "[4]" in the notebook)? Note the output below is truncated.

Screen Shot 2022-10-25 at 10 35 36 am

Then, the dask dashboard shows large amounts of unmanaged memory during the example compute operation on ds. Presumably this is the (1.1GB) json file:

Screen Shot 2022-10-25 at 11 00 38 am

What's not immediately clear to me is why this example compute takes so much longer than if I just open/concat the dataset with open_mfdataset. The number of tasks is less when using the kerchunk dataset (44 tasks vs 305 tasks for the same operation performed on the dataset opened with open_mfdataset).

@martindurant
Copy link
Member

So we see that _listdir_from_keys is taking up half the total time of opening the dataset, and this can be entirely eliminated. I believe the following would do it, if you don't mind playing:

import zarr
zarr.convenience.consolidate_metadata(m)
m.fs.save_json(new_reference_file)

and then load the new file instead. The second load should be much faster. This is with the very latest fsspec, by the way. We may decide to make this automatic in the future, and it won't matter for zarr V3.

@martindurant
Copy link
Member

The other questions:

  • yes, the workers are taking up extra memory just for the filesystem instance. You can make sure this is freed after use with skip_instance_cache=True as an argument to the filesystem/mapper (but then new tasks on a worker will need to load the JSON every time)
  • I don't know why you get different partition/task sizes, I recommend passing explicit numbers in chunk={..} to be sure of a fair comparison.

@dougiesquire
Copy link
Contributor Author

So we see that _listdir_from_keys is taking up half the total time of opening the dataset, and this can be entirely eliminated. I believe the following would do it, if you don't mind playing:

import zarr
zarr.convenience.consolidate_metadata(m)
m.fs.save_json(new_reference_file)

and then load the new file instead. The second load should be much faster. This is with the very latest fsspec, by the way. We may decide to make this automatic in the future, and it won't matter for zarr V3.

Thanks @martindurant. Yes consolidating the metadata in this way pretty much halves the time taken to open the dataset.

I'm excited to play with lazy reference formats if/when they become available. I'll close this issue in the meantime. Thanks for your help!

@martindurant
Copy link
Member

cc #237 ( @jhamman ); I would happily add consolidation to all kerchunk output, and could even remove the duplicate metadata keys. In v3, do you know if/when a directory listing would be needed? We need to ensure that reading a reference-set can happen without an ls() operation, or provide an alternative way to get the listing than going through all the keys.

@jhamman
Copy link
Contributor

jhamman commented Nov 9, 2022

Zarr v3's spec does not (yet) include consolidated metadata. This extension needs to be written by someone (zarr-developers/zarr-specs#136).

In the v2 spec, consolidated metadata only consolidates the metadata keys. However, in v3, you might imagine a use case where a Zarr storage transformer is used to consolidate the listing of the chunk keys. This sounds a lot like the Kerchunk reference spec and could also be thought of as general chunk manifest (zarr-developers/zarr-specs#82).


In v3, do you know if/when a directory listing would be needed?

To answer your specifically answer your question. In the xarray context, open_zarr() needs to list all metadata keys to determine what variables exist in the group. This is the only listing that should be required (though I think the current v3 implementation includes a few unnecessary listings (we're working to fix this)).

@martindurant
Copy link
Member

OK, so either way we will need an alternative mechanism to list at least some things (top level dirs for v2, meta files for v3), since we absolutely must avoid having to do string manipulation across all references.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants