From 89216eefbcab12d0df4d4df51a0776c626bda0bd Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Tue, 18 Oct 2022 11:40:40 -0500 Subject: [PATCH] Cache files for different CachingFileManager objects separately (#4879) * Cache files for different CachingFileManager objects separately This means that explicitly opening a file multiple times with ``open_dataset`` (e.g., after modifying it on disk) now reopens the file from scratch, rather than reusing a cached version. If users want to reuse the cached file, they can reuse the same xarray object. We don't need this for handling many files in Dask (the original motivation for caching), because in those cases only a single CachingFileManager is created. I think this should some long-standing usability issues: #4240, #4862 Conveniently, this also obviates the need for some messy reference counting logic. * Fix whats-new message location * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Add id to CachingFileManager * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * restrict new test to only netCDF files * fix whats-new message * skip test on windows * Revert "[pre-commit.ci] auto fixes from pre-commit.com hooks" This reverts commit e6371658bb0362240cc998208d3683b06cf71a88. * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Revert "Fix whats-new message location" This reverts commit 6bc80e705448c83b8c14b4d12e0c62d6d5436f95. * fixups * fix syntax * tweaks * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix types for mypy * add uuid * restore ref_counts * doc tweaks * close files inside test_open_mfdataset_list_attr * remove unused itertools * don't use refcounts * re-enable ref counting * cleanup * Apply typing suggestions from code review Co-authored-by: Illviljan <14371165+Illviljan@users.noreply.github.com> * fix import of Hashable * ignore __init__ type * fix whats-new Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Deepak Cherian Co-authored-by: Illviljan <14371165+Illviljan@users.noreply.github.com> Co-authored-by: dcherian --- doc/whats-new.rst | 4 ++ xarray/backends/file_manager.py | 78 +++++++++++++--------- xarray/tests/test_backends.py | 67 ++++++++++++++----- xarray/tests/test_backends_file_manager.py | 72 +++++++++++++------- 4 files changed, 150 insertions(+), 71 deletions(-) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 1ef0c5a72f4..5b9facf8eea 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -43,6 +43,10 @@ Deprecations Bug fixes ~~~~~~~~~ +- Explicitly opening a file multiple times (e.g., after modifying it on disk) + now reopens the file from scratch for h5netcdf and scipy netCDF backends, + rather than reusing a cached version (:issue:`4240`, :issue:`4862`). + By `Stephan Hoyer `_. Documentation ~~~~~~~~~~~~~ diff --git a/xarray/backends/file_manager.py b/xarray/backends/file_manager.py index e49555f780e..b09a6aa39bd 100644 --- a/xarray/backends/file_manager.py +++ b/xarray/backends/file_manager.py @@ -3,8 +3,9 @@ import contextlib import io import threading +import uuid import warnings -from typing import Any +from typing import Any, Hashable from ..core import utils from ..core.options import OPTIONS @@ -12,12 +13,11 @@ from .lru_cache import LRUCache # Global cache for storing open files. -FILE_CACHE: LRUCache[str, io.IOBase] = LRUCache( +FILE_CACHE: LRUCache[Any, io.IOBase] = LRUCache( maxsize=OPTIONS["file_cache_maxsize"], on_evict=lambda k, v: v.close() ) assert FILE_CACHE.maxsize, "file cache must be at least size one" - REF_COUNTS: dict[Any, int] = {} _DEFAULT_MODE = utils.ReprObject("") @@ -85,12 +85,13 @@ def __init__( kwargs=None, lock=None, cache=None, + manager_id: Hashable | None = None, ref_counts=None, ): - """Initialize a FileManager. + """Initialize a CachingFileManager. - The cache and ref_counts arguments exist solely to facilitate - dependency injection, and should only be set for tests. + The cache, manager_id and ref_counts arguments exist solely to + facilitate dependency injection, and should only be set for tests. Parameters ---------- @@ -120,6 +121,8 @@ def __init__( global variable and contains non-picklable file objects, an unpickled FileManager objects will be restored with the default cache. + manager_id : hashable, optional + Identifier for this CachingFileManager. ref_counts : dict, optional Optional dict to use for keeping track the number of references to the same file. @@ -129,13 +132,17 @@ def __init__( self._mode = mode self._kwargs = {} if kwargs is None else dict(kwargs) - self._default_lock = lock is None or lock is False - self._lock = threading.Lock() if self._default_lock else lock + self._use_default_lock = lock is None or lock is False + self._lock = threading.Lock() if self._use_default_lock else lock # cache[self._key] stores the file associated with this object. if cache is None: cache = FILE_CACHE self._cache = cache + if manager_id is None: + # Each call to CachingFileManager should separately open files. + manager_id = str(uuid.uuid4()) + self._manager_id = manager_id self._key = self._make_key() # ref_counts[self._key] stores the number of CachingFileManager objects @@ -153,6 +160,7 @@ def _make_key(self): self._args, "a" if self._mode == "w" else self._mode, tuple(sorted(self._kwargs.items())), + self._manager_id, ) return _HashedSequence(value) @@ -223,20 +231,14 @@ def close(self, needs_lock=True): if file is not None: file.close() - def __del__(self): - # If we're the only CachingFileManger referencing a unclosed file, we - # should remove it from the cache upon garbage collection. + def __del__(self) -> None: + # If we're the only CachingFileManger referencing a unclosed file, + # remove it from the cache upon garbage collection. # - # Keeping our own count of file references might seem like overkill, - # but it's actually pretty common to reopen files with the same - # variable name in a notebook or command line environment, e.g., to - # fix the parameters used when opening a file: - # >>> ds = xarray.open_dataset('myfile.nc') - # >>> ds = xarray.open_dataset('myfile.nc', decode_times=False) - # This second assignment to "ds" drops CPython's ref-count on the first - # "ds" argument to zero, which can trigger garbage collections. So if - # we didn't check whether another object is referencing 'myfile.nc', - # the newly opened file would actually be immediately closed! + # We keep track of our own reference count because we don't want to + # close files if another identical file manager needs it. This can + # happen if a CachingFileManager is pickled and unpickled without + # closing the original file. ref_count = self._ref_counter.decrement(self._key) if not ref_count and self._key in self._cache: @@ -249,30 +251,40 @@ def __del__(self): if OPTIONS["warn_for_unclosed_files"]: warnings.warn( - "deallocating {}, but file is not already closed. " - "This may indicate a bug.".format(self), + f"deallocating {self}, but file is not already closed. " + "This may indicate a bug.", RuntimeWarning, stacklevel=2, ) def __getstate__(self): """State for pickling.""" - # cache and ref_counts are intentionally omitted: we don't want to try - # to serialize these global objects. - lock = None if self._default_lock else self._lock - return (self._opener, self._args, self._mode, self._kwargs, lock) + # cache is intentionally omitted: we don't want to try to serialize + # these global objects. + lock = None if self._use_default_lock else self._lock + return ( + self._opener, + self._args, + self._mode, + self._kwargs, + lock, + self._manager_id, + ) - def __setstate__(self, state): + def __setstate__(self, state) -> None: """Restore from a pickle.""" - opener, args, mode, kwargs, lock = state - self.__init__(opener, *args, mode=mode, kwargs=kwargs, lock=lock) + opener, args, mode, kwargs, lock, manager_id = state + self.__init__( # type: ignore + opener, *args, mode=mode, kwargs=kwargs, lock=lock, manager_id=manager_id + ) - def __repr__(self): + def __repr__(self) -> str: args_string = ", ".join(map(repr, self._args)) if self._mode is not _DEFAULT_MODE: args_string += f", mode={self._mode!r}" - return "{}({!r}, {}, kwargs={})".format( - type(self).__name__, self._opener, args_string, self._kwargs + return ( + f"{type(self).__name__}({self._opener!r}, {args_string}, " + f"kwargs={self._kwargs}, manager_id={self._manager_id!r})" ) diff --git a/xarray/tests/test_backends.py b/xarray/tests/test_backends.py index eebbe569d63..a06c16bc115 100644 --- a/xarray/tests/test_backends.py +++ b/xarray/tests/test_backends.py @@ -1207,6 +1207,39 @@ def test_multiindex_not_implemented(self) -> None: pass +class NetCDFBase(CFEncodedBase): + """Tests for all netCDF3 and netCDF4 backends.""" + + @pytest.mark.skipif( + ON_WINDOWS, reason="Windows does not allow modifying open files" + ) + def test_refresh_from_disk(self) -> None: + # regression test for https://github.com/pydata/xarray/issues/4862 + + with create_tmp_file() as example_1_path: + with create_tmp_file() as example_1_modified_path: + + with open_example_dataset("example_1.nc") as example_1: + self.save(example_1, example_1_path) + + example_1.rh.values += 100 + self.save(example_1, example_1_modified_path) + + a = open_dataset(example_1_path, engine=self.engine).load() + + # Simulate external process modifying example_1.nc while this script is running + shutil.copy(example_1_modified_path, example_1_path) + + # Reopen example_1.nc (modified) as `b`; note that `a` has NOT been closed + b = open_dataset(example_1_path, engine=self.engine).load() + + try: + assert not np.array_equal(a.rh.values, b.rh.values) + finally: + a.close() + b.close() + + _counter = itertools.count() @@ -1238,7 +1271,7 @@ def create_tmp_files( yield files -class NetCDF4Base(CFEncodedBase): +class NetCDF4Base(NetCDFBase): """Tests for both netCDF4-python and h5netcdf.""" engine: T_NetcdfEngine = "netcdf4" @@ -1595,6 +1628,10 @@ def test_setncattr_string(self) -> None: assert_array_equal(one_element_list_of_strings, totest.attrs["bar"]) assert one_string == totest.attrs["baz"] + @pytest.mark.skip(reason="https://github.com/Unidata/netcdf4-python/issues/1195") + def test_refresh_from_disk(self) -> None: + super().test_refresh_from_disk() + @requires_netCDF4 class TestNetCDF4AlreadyOpen: @@ -3182,20 +3219,20 @@ def test_open_mfdataset_list_attr() -> None: with create_tmp_files(2) as nfiles: for i in range(2): - f = Dataset(nfiles[i], "w") - f.createDimension("x", 3) - vlvar = f.createVariable("test_var", np.int32, ("x")) - # here create an attribute as a list - vlvar.test_attr = [f"string a {i}", f"string b {i}"] - vlvar[:] = np.arange(3) - f.close() - ds1 = open_dataset(nfiles[0]) - ds2 = open_dataset(nfiles[1]) - original = xr.concat([ds1, ds2], dim="x") - with xr.open_mfdataset( - [nfiles[0], nfiles[1]], combine="nested", concat_dim="x" - ) as actual: - assert_identical(actual, original) + with Dataset(nfiles[i], "w") as f: + f.createDimension("x", 3) + vlvar = f.createVariable("test_var", np.int32, ("x")) + # here create an attribute as a list + vlvar.test_attr = [f"string a {i}", f"string b {i}"] + vlvar[:] = np.arange(3) + + with open_dataset(nfiles[0]) as ds1: + with open_dataset(nfiles[1]) as ds2: + original = xr.concat([ds1, ds2], dim="x") + with xr.open_mfdataset( + [nfiles[0], nfiles[1]], combine="nested", concat_dim="x" + ) as actual: + assert_identical(actual, original) @requires_scipy_or_netCDF4 diff --git a/xarray/tests/test_backends_file_manager.py b/xarray/tests/test_backends_file_manager.py index 726ffa62354..1bd66164436 100644 --- a/xarray/tests/test_backends_file_manager.py +++ b/xarray/tests/test_backends_file_manager.py @@ -7,6 +7,7 @@ import pytest +# from xarray.backends import file_manager from xarray.backends.file_manager import CachingFileManager from xarray.backends.lru_cache import LRUCache from xarray.core.options import set_options @@ -89,7 +90,7 @@ def test_file_manager_repr() -> None: assert "my-file" in repr(manager) -def test_file_manager_refcounts() -> None: +def test_file_manager_cache_and_refcounts() -> None: mock_file = mock.Mock() opener = mock.Mock(spec=open, return_value=mock_file) cache: dict = {} @@ -97,47 +98,72 @@ def test_file_manager_refcounts() -> None: manager = CachingFileManager(opener, "filename", cache=cache, ref_counts=ref_counts) assert ref_counts[manager._key] == 1 + + assert not cache manager.acquire() - assert cache + assert len(cache) == 1 - manager2 = CachingFileManager( - opener, "filename", cache=cache, ref_counts=ref_counts - ) - assert cache - assert manager._key == manager2._key - assert ref_counts[manager._key] == 2 + with set_options(warn_for_unclosed_files=False): + del manager + gc.collect() + + assert not ref_counts + assert not cache + + +def test_file_manager_cache_repeated_open() -> None: + mock_file = mock.Mock() + opener = mock.Mock(spec=open, return_value=mock_file) + cache: dict = {} + + manager = CachingFileManager(opener, "filename", cache=cache) + manager.acquire() + assert len(cache) == 1 + + manager2 = CachingFileManager(opener, "filename", cache=cache) + manager2.acquire() + assert len(cache) == 2 with set_options(warn_for_unclosed_files=False): del manager gc.collect() - assert cache - assert ref_counts[manager2._key] == 1 - mock_file.close.assert_not_called() + assert len(cache) == 1 with set_options(warn_for_unclosed_files=False): del manager2 gc.collect() - assert not ref_counts assert not cache -def test_file_manager_replace_object() -> None: - opener = mock.Mock() +def test_file_manager_cache_with_pickle(tmpdir) -> None: + + path = str(tmpdir.join("testing.txt")) + with open(path, "w") as f: + f.write("data") cache: dict = {} - ref_counts: dict = {} - manager = CachingFileManager(opener, "filename", cache=cache, ref_counts=ref_counts) - manager.acquire() - assert ref_counts[manager._key] == 1 - assert cache + with mock.patch("xarray.backends.file_manager.FILE_CACHE", cache): + assert not cache - manager = CachingFileManager(opener, "filename", cache=cache, ref_counts=ref_counts) - assert ref_counts[manager._key] == 1 - assert cache + manager = CachingFileManager(open, path, mode="r") + manager.acquire() + assert len(cache) == 1 - manager.close() + manager2 = pickle.loads(pickle.dumps(manager)) + manager2.acquire() + assert len(cache) == 1 + + with set_options(warn_for_unclosed_files=False): + del manager + gc.collect() + # assert len(cache) == 1 + + with set_options(warn_for_unclosed_files=False): + del manager2 + gc.collect() + assert not cache def test_file_manager_write_consecutive(tmpdir, file_cache) -> None: