Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache files for different CachingFileManager objects separately #4879

Merged
merged 36 commits into from
Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
a80cfd2
Cache files for different CachingFileManager objects separately
shoyer Feb 7, 2021
d8b3bb7
Merge branch 'main' into file-manager-not-shared
shoyer Sep 27, 2022
6bc80e7
Fix whats-new message location
shoyer Sep 27, 2022
e637165
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 27, 2022
d587bfc
Add id to CachingFileManager
shoyer Sep 27, 2022
4f4ba13
Merge branch 'main' into file-manager-not-shared
shoyer Sep 27, 2022
e93f1f5
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 27, 2022
257eb00
restrict new test to only netCDF files
shoyer Sep 27, 2022
7d857f3
fix whats-new message
shoyer Sep 27, 2022
a3556d1
skip test on windows
shoyer Sep 27, 2022
7c7c4e8
Revert "[pre-commit.ci] auto fixes from pre-commit.com hooks"
shoyer Sep 28, 2022
c51e81e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 28, 2022
89c2b55
Revert "Fix whats-new message location"
shoyer Sep 28, 2022
c320acb
fixups
shoyer Sep 28, 2022
2cab733
fix syntax
shoyer Sep 28, 2022
997c3d4
Merge branch 'main' into file-manager-not-shared
shoyer Oct 3, 2022
a3486c8
tweaks
shoyer Oct 3, 2022
d24914e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 3, 2022
7105ec2
fix types for mypy
shoyer Oct 3, 2022
38c2a16
add uuid
shoyer Oct 3, 2022
6d6e2dd
restore ref_counts
shoyer Oct 4, 2022
d95f9f0
Merge branch 'main' into file-manager-not-shared
shoyer Oct 4, 2022
a837f3b
doc tweaks
shoyer Oct 5, 2022
25706eb
close files inside test_open_mfdataset_list_attr
shoyer Oct 5, 2022
1466c82
remove unused itertools
shoyer Oct 5, 2022
382d734
don't use refcounts
shoyer Oct 5, 2022
46f4fef
re-enable ref counting
shoyer Oct 5, 2022
3ec678e
cleanup
shoyer Oct 5, 2022
929e5d1
Merge branch 'main' into file-manager-not-shared
dcherian Oct 12, 2022
fe7b3c3
Apply typing suggestions from code review
dcherian Oct 12, 2022
915976d
Merge branch 'main' into file-manager-not-shared
dcherian Oct 12, 2022
06c5d51
fix import of Hashable
dcherian Oct 13, 2022
cb16f88
Merge branch 'main' into file-manager-not-shared
dcherian Oct 13, 2022
e05cb3b
ignore __init__ type
shoyer Oct 14, 2022
4dfbfc4
Merge branch 'main' into file-manager-not-shared
dcherian Oct 17, 2022
a5bf621
fix whats-new
dcherian Oct 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ Deprecations

Bug fixes
~~~~~~~~~

- Explicitly opening a file multiple times (e.g., after modifying it on disk)
dcherian marked this conversation as resolved.
Show resolved Hide resolved
now reopens the file from scratch for h5netcdf and scipy netCDF backends,
rather than reusing a cached version (:issue:`4240`, :issue:`4862`).
By `Stephan Hoyer <https://github.com/shoyer>`_.
- Fixed :py:meth:`Dataset.transpose` to raise a more informative error. (:issue:`6502`, :pull:`7120`)
By `Patrick Naylor <https://github.com/patrick-naylor>`_

Expand Down
70 changes: 41 additions & 29 deletions xarray/backends/file_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import contextlib
import io
import threading
import uuid
import warnings
from typing import Any

Expand All @@ -12,12 +13,11 @@
from .lru_cache import LRUCache

# Global cache for storing open files.
FILE_CACHE: LRUCache[str, io.IOBase] = LRUCache(
FILE_CACHE: LRUCache[Any, io.IOBase] = LRUCache(
maxsize=OPTIONS["file_cache_maxsize"], on_evict=lambda k, v: v.close()
)
assert FILE_CACHE.maxsize, "file cache must be at least size one"


REF_COUNTS: dict[Any, int] = {}

_DEFAULT_MODE = utils.ReprObject("<unused>")
Expand Down Expand Up @@ -85,12 +85,13 @@ def __init__(
kwargs=None,
lock=None,
cache=None,
manager_id=None,
dcherian marked this conversation as resolved.
Show resolved Hide resolved
ref_counts=None,
):
"""Initialize a FileManager.
"""Initialize a CachingFileManager.

The cache and ref_counts arguments exist solely to facilitate
dependency injection, and should only be set for tests.
The cache, manager_id and ref_counts arguments exist solely to
facilitate dependency injection, and should only be set for tests.

Parameters
----------
Expand Down Expand Up @@ -120,6 +121,8 @@ def __init__(
global variable and contains non-picklable file objects, an
unpickled FileManager objects will be restored with the default
cache.
manager_id : hashable, optional
Identifier for this CachingFileManager.
ref_counts : dict, optional
Optional dict to use for keeping track the number of references to
the same file.
Expand All @@ -129,13 +132,17 @@ def __init__(
self._mode = mode
self._kwargs = {} if kwargs is None else dict(kwargs)

self._default_lock = lock is None or lock is False
self._lock = threading.Lock() if self._default_lock else lock
self._use_default_lock = lock is None or lock is False
self._lock = threading.Lock() if self._use_default_lock else lock

# cache[self._key] stores the file associated with this object.
if cache is None:
cache = FILE_CACHE
self._cache = cache
if manager_id is None:
# Each call to CachingFileManager should separately open files.
manager_id = str(uuid.uuid4())
self._manager_id = manager_id
self._key = self._make_key()

# ref_counts[self._key] stores the number of CachingFileManager objects
Expand All @@ -153,6 +160,7 @@ def _make_key(self):
self._args,
"a" if self._mode == "w" else self._mode,
tuple(sorted(self._kwargs.items())),
self._manager_id,
)
return _HashedSequence(value)

Expand Down Expand Up @@ -224,19 +232,13 @@ def close(self, needs_lock=True):
file.close()

def __del__(self):
dcherian marked this conversation as resolved.
Show resolved Hide resolved
# If we're the only CachingFileManger referencing a unclosed file, we
# should remove it from the cache upon garbage collection.
# If we're the only CachingFileManger referencing a unclosed file,
# remove it from the cache upon garbage collection.
#
# Keeping our own count of file references might seem like overkill,
# but it's actually pretty common to reopen files with the same
# variable name in a notebook or command line environment, e.g., to
# fix the parameters used when opening a file:
# >>> ds = xarray.open_dataset('myfile.nc')
# >>> ds = xarray.open_dataset('myfile.nc', decode_times=False)
# This second assignment to "ds" drops CPython's ref-count on the first
# "ds" argument to zero, which can trigger garbage collections. So if
# we didn't check whether another object is referencing 'myfile.nc',
# the newly opened file would actually be immediately closed!
# We keep track of our own reference count because we don't want to
# close files if another identical file manager needs it. This can
# happen if a CachingFileManager is pickled and unpickled without
# closing the original file.
ref_count = self._ref_counter.decrement(self._key)

if not ref_count and self._key in self._cache:
Expand All @@ -249,30 +251,40 @@ def __del__(self):

if OPTIONS["warn_for_unclosed_files"]:
warnings.warn(
"deallocating {}, but file is not already closed. "
"This may indicate a bug.".format(self),
f"deallocating {self}, but file is not already closed. "
"This may indicate a bug.",
RuntimeWarning,
stacklevel=2,
)

def __getstate__(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def __getstate__(self):
def __getstate__(self) -> tuple[Any, Any, Any, Any, Any, Any]:

The Any's can be replaced with narrower versions, I couldn't figure them out on a quick glance.

"""State for pickling."""
# cache and ref_counts are intentionally omitted: we don't want to try
# to serialize these global objects.
lock = None if self._default_lock else self._lock
return (self._opener, self._args, self._mode, self._kwargs, lock)
# cache is intentionally omitted: we don't want to try to serialize
# these global objects.
lock = None if self._use_default_lock else self._lock
return (
self._opener,
self._args,
self._mode,
self._kwargs,
lock,
self._manager_id,
Copy link
Collaborator

@headtr1ck headtr1ck Oct 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know enough what exactly this is used for, but make sure that you don't need to do a similar thing as for lock (replace with None in case it is default).
But ignore this comment if this is totally intentional :)

)

def __setstate__(self, state):
dcherian marked this conversation as resolved.
Show resolved Hide resolved
"""Restore from a pickle."""
opener, args, mode, kwargs, lock = state
self.__init__(opener, *args, mode=mode, kwargs=kwargs, lock=lock)
opener, args, mode, kwargs, lock, manager_id = state
self.__init__(
opener, *args, mode=mode, kwargs=kwargs, lock=lock, manager_id=manager_id
)

def __repr__(self):
dcherian marked this conversation as resolved.
Show resolved Hide resolved
args_string = ", ".join(map(repr, self._args))
if self._mode is not _DEFAULT_MODE:
args_string += f", mode={self._mode!r}"
return "{}({!r}, {}, kwargs={})".format(
type(self).__name__, self._opener, args_string, self._kwargs
return (
f"{type(self).__name__}({self._opener!r}, {args_string}, "
f"kwargs={self._kwargs}, manager_id={self._manager_id!r})"
)


Expand Down
71 changes: 54 additions & 17 deletions xarray/tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -1196,6 +1196,39 @@ def test_multiindex_not_implemented(self):
pass


class NetCDFBase(CFEncodedBase):
"""Tests for all netCDF3 and netCDF4 backends."""

@pytest.mark.skipif(
ON_WINDOWS, reason="Windows does not allow modifying open files"
)
def test_refresh_from_disk(self):
dcherian marked this conversation as resolved.
Show resolved Hide resolved
# regression test for https://github.com/pydata/xarray/issues/4862

with create_tmp_file() as example_1_path:
with create_tmp_file() as example_1_modified_path:

with open_example_dataset("example_1.nc") as example_1:
self.save(example_1, example_1_path)

example_1.rh.values += 100
self.save(example_1, example_1_modified_path)

a = open_dataset(example_1_path, engine=self.engine).load()

# Simulate external process modifying example_1.nc while this script is running
shutil.copy(example_1_modified_path, example_1_path)

# Reopen example_1.nc (modified) as `b`; note that `a` has NOT been closed
b = open_dataset(example_1_path, engine=self.engine).load()

try:
assert not np.array_equal(a.rh.values, b.rh.values)
finally:
a.close()
b.close()


_counter = itertools.count()


Expand Down Expand Up @@ -1223,7 +1256,7 @@ def create_tmp_files(nfiles, suffix=".nc", allow_cleanup_failure=False):
yield files


class NetCDF4Base(CFEncodedBase):
class NetCDF4Base(NetCDFBase):
"""Tests for both netCDF4-python and h5netcdf."""

engine = "netcdf4"
Expand Down Expand Up @@ -1580,6 +1613,10 @@ def test_setncattr_string(self):
assert_array_equal(one_element_list_of_strings, totest.attrs["bar"])
assert one_string == totest.attrs["baz"]

@pytest.mark.skip(reason="https://github.com/Unidata/netcdf4-python/issues/1195")
def test_refresh_from_disk(self):
dcherian marked this conversation as resolved.
Show resolved Hide resolved
super().test_refresh_from_disk()


@requires_netCDF4
class TestNetCDF4AlreadyOpen:
Expand Down Expand Up @@ -2547,7 +2584,7 @@ def test_pickle_dataarray(self):


@requires_scipy
class TestScipyFilePath(CFEncodedBase, NetCDF3Only):
class TestScipyFilePath(NetCDFBase, NetCDF3Only):
engine = "scipy"

@contextlib.contextmanager
Expand Down Expand Up @@ -2584,7 +2621,7 @@ def test_nc4_scipy(self):


@requires_netCDF4
class TestNetCDF3ViaNetCDF4Data(CFEncodedBase, NetCDF3Only):
class TestNetCDF3ViaNetCDF4Data(NetCDFBase, NetCDF3Only):
engine = "netcdf4"
file_format = "NETCDF3_CLASSIC"

Expand Down Expand Up @@ -3167,20 +3204,20 @@ def test_open_mfdataset_list_attr():

with create_tmp_files(2) as nfiles:
for i in range(2):
f = Dataset(nfiles[i], "w")
f.createDimension("x", 3)
vlvar = f.createVariable("test_var", np.int32, ("x"))
# here create an attribute as a list
vlvar.test_attr = [f"string a {i}", f"string b {i}"]
vlvar[:] = np.arange(3)
f.close()
ds1 = open_dataset(nfiles[0])
ds2 = open_dataset(nfiles[1])
original = xr.concat([ds1, ds2], dim="x")
with xr.open_mfdataset(
[nfiles[0], nfiles[1]], combine="nested", concat_dim="x"
) as actual:
assert_identical(actual, original)
with Dataset(nfiles[i], "w") as f:
f.createDimension("x", 3)
vlvar = f.createVariable("test_var", np.int32, ("x"))
# here create an attribute as a list
vlvar.test_attr = [f"string a {i}", f"string b {i}"]
vlvar[:] = np.arange(3)

with open_dataset(nfiles[0]) as ds1:
with open_dataset(nfiles[1]) as ds2:
original = xr.concat([ds1, ds2], dim="x")
with xr.open_mfdataset(
[nfiles[0], nfiles[1]], combine="nested", concat_dim="x"
) as actual:
assert_identical(actual, original)


@requires_scipy_or_netCDF4
Expand Down
72 changes: 49 additions & 23 deletions xarray/tests/test_backends_file_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import pytest

# from xarray.backends import file_manager
from xarray.backends.file_manager import CachingFileManager
from xarray.backends.lru_cache import LRUCache
from xarray.core.options import set_options
Expand Down Expand Up @@ -89,55 +90,80 @@ def test_file_manager_repr() -> None:
assert "my-file" in repr(manager)


def test_file_manager_refcounts() -> None:
def test_file_manager_cache_and_refcounts() -> None:
mock_file = mock.Mock()
opener = mock.Mock(spec=open, return_value=mock_file)
cache: dict = {}
ref_counts: dict = {}

manager = CachingFileManager(opener, "filename", cache=cache, ref_counts=ref_counts)
assert ref_counts[manager._key] == 1

assert not cache
manager.acquire()
assert cache
assert len(cache) == 1

manager2 = CachingFileManager(
opener, "filename", cache=cache, ref_counts=ref_counts
)
assert cache
assert manager._key == manager2._key
assert ref_counts[manager._key] == 2
with set_options(warn_for_unclosed_files=False):
del manager
gc.collect()

assert not ref_counts
assert not cache


def test_file_manager_cache_repeated_open() -> None:
mock_file = mock.Mock()
opener = mock.Mock(spec=open, return_value=mock_file)
cache: dict = {}

manager = CachingFileManager(opener, "filename", cache=cache)
manager.acquire()
assert len(cache) == 1

manager2 = CachingFileManager(opener, "filename", cache=cache)
manager2.acquire()
assert len(cache) == 2

with set_options(warn_for_unclosed_files=False):
del manager
gc.collect()

assert cache
assert ref_counts[manager2._key] == 1
mock_file.close.assert_not_called()
assert len(cache) == 1

with set_options(warn_for_unclosed_files=False):
del manager2
gc.collect()

assert not ref_counts
assert not cache


def test_file_manager_replace_object() -> None:
opener = mock.Mock()
def test_file_manager_cache_with_pickle(tmpdir) -> None:

path = str(tmpdir.join("testing.txt"))
with open(path, "w") as f:
f.write("data")
cache: dict = {}
ref_counts: dict = {}

manager = CachingFileManager(opener, "filename", cache=cache, ref_counts=ref_counts)
manager.acquire()
assert ref_counts[manager._key] == 1
assert cache
with mock.patch("xarray.backends.file_manager.FILE_CACHE", cache):
assert not cache

manager = CachingFileManager(opener, "filename", cache=cache, ref_counts=ref_counts)
assert ref_counts[manager._key] == 1
assert cache
manager = CachingFileManager(open, path, mode="r")
manager.acquire()
assert len(cache) == 1

manager.close()
manager2 = pickle.loads(pickle.dumps(manager))
manager2.acquire()
assert len(cache) == 1

with set_options(warn_for_unclosed_files=False):
del manager
gc.collect()
# assert len(cache) == 1

with set_options(warn_for_unclosed_files=False):
del manager2
gc.collect()
assert not cache


def test_file_manager_write_consecutive(tmpdir, file_cache) -> None:
Expand Down