Skip to content
/ xarray Public
forked from pydata/xarray

Commit

Permalink
Cache files for different CachingFileManager objects separately (pyda…
Browse files Browse the repository at this point in the history
…ta#4879)

* Cache files for different CachingFileManager objects separately

This means that explicitly opening a file multiple times with
``open_dataset`` (e.g., after modifying it on disk) now reopens the file
from scratch, rather than reusing a cached version.

If users want to reuse the cached file, they can reuse the same xarray
object. We don't need this for handling many files in Dask (the original
motivation for caching), because in those cases only a single
CachingFileManager is created.

I think this should some long-standing usability issues: pydata#4240, pydata#4862

Conveniently, this also obviates the need for some messy reference
counting logic.

* Fix whats-new message location

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Add id to CachingFileManager

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* restrict new test to only netCDF files

* fix whats-new message

* skip test on windows

* Revert "[pre-commit.ci] auto fixes from pre-commit.com hooks"

This reverts commit e637165.

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Revert "Fix whats-new message location"

This reverts commit 6bc80e7.

* fixups

* fix syntax

* tweaks

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix types for mypy

* add uuid

* restore ref_counts

* doc tweaks

* close files inside test_open_mfdataset_list_attr

* remove unused itertools

* don't use refcounts

* re-enable ref counting

* cleanup

* Apply typing suggestions from code review

Co-authored-by: Illviljan <14371165+Illviljan@users.noreply.github.com>

* fix import of Hashable

* ignore __init__ type

* fix whats-new

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Deepak Cherian <dcherian@users.noreply.github.com>
Co-authored-by: Illviljan <14371165+Illviljan@users.noreply.github.com>
Co-authored-by: dcherian <deepak@cherian.net>
  • Loading branch information
5 people authored and keewis committed Oct 19, 2022
1 parent 20ffe1d commit 89216ee
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 71 deletions.
4 changes: 4 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ Deprecations
Bug fixes
~~~~~~~~~

- Explicitly opening a file multiple times (e.g., after modifying it on disk)
now reopens the file from scratch for h5netcdf and scipy netCDF backends,
rather than reusing a cached version (:issue:`4240`, :issue:`4862`).
By `Stephan Hoyer <https://github.com/shoyer>`_.

Documentation
~~~~~~~~~~~~~
Expand Down
78 changes: 45 additions & 33 deletions xarray/backends/file_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@
import contextlib
import io
import threading
import uuid
import warnings
from typing import Any
from typing import Any, Hashable

from ..core import utils
from ..core.options import OPTIONS
from .locks import acquire
from .lru_cache import LRUCache

# Global cache for storing open files.
FILE_CACHE: LRUCache[str, io.IOBase] = LRUCache(
FILE_CACHE: LRUCache[Any, io.IOBase] = LRUCache(
maxsize=OPTIONS["file_cache_maxsize"], on_evict=lambda k, v: v.close()
)
assert FILE_CACHE.maxsize, "file cache must be at least size one"


REF_COUNTS: dict[Any, int] = {}

_DEFAULT_MODE = utils.ReprObject("<unused>")
Expand Down Expand Up @@ -85,12 +85,13 @@ def __init__(
kwargs=None,
lock=None,
cache=None,
manager_id: Hashable | None = None,
ref_counts=None,
):
"""Initialize a FileManager.
"""Initialize a CachingFileManager.
The cache and ref_counts arguments exist solely to facilitate
dependency injection, and should only be set for tests.
The cache, manager_id and ref_counts arguments exist solely to
facilitate dependency injection, and should only be set for tests.
Parameters
----------
Expand Down Expand Up @@ -120,6 +121,8 @@ def __init__(
global variable and contains non-picklable file objects, an
unpickled FileManager objects will be restored with the default
cache.
manager_id : hashable, optional
Identifier for this CachingFileManager.
ref_counts : dict, optional
Optional dict to use for keeping track the number of references to
the same file.
Expand All @@ -129,13 +132,17 @@ def __init__(
self._mode = mode
self._kwargs = {} if kwargs is None else dict(kwargs)

self._default_lock = lock is None or lock is False
self._lock = threading.Lock() if self._default_lock else lock
self._use_default_lock = lock is None or lock is False
self._lock = threading.Lock() if self._use_default_lock else lock

# cache[self._key] stores the file associated with this object.
if cache is None:
cache = FILE_CACHE
self._cache = cache
if manager_id is None:
# Each call to CachingFileManager should separately open files.
manager_id = str(uuid.uuid4())
self._manager_id = manager_id
self._key = self._make_key()

# ref_counts[self._key] stores the number of CachingFileManager objects
Expand All @@ -153,6 +160,7 @@ def _make_key(self):
self._args,
"a" if self._mode == "w" else self._mode,
tuple(sorted(self._kwargs.items())),
self._manager_id,
)
return _HashedSequence(value)

Expand Down Expand Up @@ -223,20 +231,14 @@ def close(self, needs_lock=True):
if file is not None:
file.close()

def __del__(self):
# If we're the only CachingFileManger referencing a unclosed file, we
# should remove it from the cache upon garbage collection.
def __del__(self) -> None:
# If we're the only CachingFileManger referencing a unclosed file,
# remove it from the cache upon garbage collection.
#
# Keeping our own count of file references might seem like overkill,
# but it's actually pretty common to reopen files with the same
# variable name in a notebook or command line environment, e.g., to
# fix the parameters used when opening a file:
# >>> ds = xarray.open_dataset('myfile.nc')
# >>> ds = xarray.open_dataset('myfile.nc', decode_times=False)
# This second assignment to "ds" drops CPython's ref-count on the first
# "ds" argument to zero, which can trigger garbage collections. So if
# we didn't check whether another object is referencing 'myfile.nc',
# the newly opened file would actually be immediately closed!
# We keep track of our own reference count because we don't want to
# close files if another identical file manager needs it. This can
# happen if a CachingFileManager is pickled and unpickled without
# closing the original file.
ref_count = self._ref_counter.decrement(self._key)

if not ref_count and self._key in self._cache:
Expand All @@ -249,30 +251,40 @@ def __del__(self):

if OPTIONS["warn_for_unclosed_files"]:
warnings.warn(
"deallocating {}, but file is not already closed. "
"This may indicate a bug.".format(self),
f"deallocating {self}, but file is not already closed. "
"This may indicate a bug.",
RuntimeWarning,
stacklevel=2,
)

def __getstate__(self):
"""State for pickling."""
# cache and ref_counts are intentionally omitted: we don't want to try
# to serialize these global objects.
lock = None if self._default_lock else self._lock
return (self._opener, self._args, self._mode, self._kwargs, lock)
# cache is intentionally omitted: we don't want to try to serialize
# these global objects.
lock = None if self._use_default_lock else self._lock
return (
self._opener,
self._args,
self._mode,
self._kwargs,
lock,
self._manager_id,
)

def __setstate__(self, state):
def __setstate__(self, state) -> None:
"""Restore from a pickle."""
opener, args, mode, kwargs, lock = state
self.__init__(opener, *args, mode=mode, kwargs=kwargs, lock=lock)
opener, args, mode, kwargs, lock, manager_id = state
self.__init__( # type: ignore
opener, *args, mode=mode, kwargs=kwargs, lock=lock, manager_id=manager_id
)

def __repr__(self):
def __repr__(self) -> str:
args_string = ", ".join(map(repr, self._args))
if self._mode is not _DEFAULT_MODE:
args_string += f", mode={self._mode!r}"
return "{}({!r}, {}, kwargs={})".format(
type(self).__name__, self._opener, args_string, self._kwargs
return (
f"{type(self).__name__}({self._opener!r}, {args_string}, "
f"kwargs={self._kwargs}, manager_id={self._manager_id!r})"
)


Expand Down
67 changes: 52 additions & 15 deletions xarray/tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,39 @@ def test_multiindex_not_implemented(self) -> None:
pass


class NetCDFBase(CFEncodedBase):
"""Tests for all netCDF3 and netCDF4 backends."""

@pytest.mark.skipif(
ON_WINDOWS, reason="Windows does not allow modifying open files"
)
def test_refresh_from_disk(self) -> None:
# regression test for https://github.com/pydata/xarray/issues/4862

with create_tmp_file() as example_1_path:
with create_tmp_file() as example_1_modified_path:

with open_example_dataset("example_1.nc") as example_1:
self.save(example_1, example_1_path)

example_1.rh.values += 100
self.save(example_1, example_1_modified_path)

a = open_dataset(example_1_path, engine=self.engine).load()

# Simulate external process modifying example_1.nc while this script is running
shutil.copy(example_1_modified_path, example_1_path)

# Reopen example_1.nc (modified) as `b`; note that `a` has NOT been closed
b = open_dataset(example_1_path, engine=self.engine).load()

try:
assert not np.array_equal(a.rh.values, b.rh.values)
finally:
a.close()
b.close()


_counter = itertools.count()


Expand Down Expand Up @@ -1238,7 +1271,7 @@ def create_tmp_files(
yield files


class NetCDF4Base(CFEncodedBase):
class NetCDF4Base(NetCDFBase):
"""Tests for both netCDF4-python and h5netcdf."""

engine: T_NetcdfEngine = "netcdf4"
Expand Down Expand Up @@ -1595,6 +1628,10 @@ def test_setncattr_string(self) -> None:
assert_array_equal(one_element_list_of_strings, totest.attrs["bar"])
assert one_string == totest.attrs["baz"]

@pytest.mark.skip(reason="https://github.com/Unidata/netcdf4-python/issues/1195")
def test_refresh_from_disk(self) -> None:
super().test_refresh_from_disk()


@requires_netCDF4
class TestNetCDF4AlreadyOpen:
Expand Down Expand Up @@ -3182,20 +3219,20 @@ def test_open_mfdataset_list_attr() -> None:

with create_tmp_files(2) as nfiles:
for i in range(2):
f = Dataset(nfiles[i], "w")
f.createDimension("x", 3)
vlvar = f.createVariable("test_var", np.int32, ("x"))
# here create an attribute as a list
vlvar.test_attr = [f"string a {i}", f"string b {i}"]
vlvar[:] = np.arange(3)
f.close()
ds1 = open_dataset(nfiles[0])
ds2 = open_dataset(nfiles[1])
original = xr.concat([ds1, ds2], dim="x")
with xr.open_mfdataset(
[nfiles[0], nfiles[1]], combine="nested", concat_dim="x"
) as actual:
assert_identical(actual, original)
with Dataset(nfiles[i], "w") as f:
f.createDimension("x", 3)
vlvar = f.createVariable("test_var", np.int32, ("x"))
# here create an attribute as a list
vlvar.test_attr = [f"string a {i}", f"string b {i}"]
vlvar[:] = np.arange(3)

with open_dataset(nfiles[0]) as ds1:
with open_dataset(nfiles[1]) as ds2:
original = xr.concat([ds1, ds2], dim="x")
with xr.open_mfdataset(
[nfiles[0], nfiles[1]], combine="nested", concat_dim="x"
) as actual:
assert_identical(actual, original)


@requires_scipy_or_netCDF4
Expand Down
72 changes: 49 additions & 23 deletions xarray/tests/test_backends_file_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import pytest

# from xarray.backends import file_manager
from xarray.backends.file_manager import CachingFileManager
from xarray.backends.lru_cache import LRUCache
from xarray.core.options import set_options
Expand Down Expand Up @@ -89,55 +90,80 @@ def test_file_manager_repr() -> None:
assert "my-file" in repr(manager)


def test_file_manager_refcounts() -> None:
def test_file_manager_cache_and_refcounts() -> None:
mock_file = mock.Mock()
opener = mock.Mock(spec=open, return_value=mock_file)
cache: dict = {}
ref_counts: dict = {}

manager = CachingFileManager(opener, "filename", cache=cache, ref_counts=ref_counts)
assert ref_counts[manager._key] == 1

assert not cache
manager.acquire()
assert cache
assert len(cache) == 1

manager2 = CachingFileManager(
opener, "filename", cache=cache, ref_counts=ref_counts
)
assert cache
assert manager._key == manager2._key
assert ref_counts[manager._key] == 2
with set_options(warn_for_unclosed_files=False):
del manager
gc.collect()

assert not ref_counts
assert not cache


def test_file_manager_cache_repeated_open() -> None:
mock_file = mock.Mock()
opener = mock.Mock(spec=open, return_value=mock_file)
cache: dict = {}

manager = CachingFileManager(opener, "filename", cache=cache)
manager.acquire()
assert len(cache) == 1

manager2 = CachingFileManager(opener, "filename", cache=cache)
manager2.acquire()
assert len(cache) == 2

with set_options(warn_for_unclosed_files=False):
del manager
gc.collect()

assert cache
assert ref_counts[manager2._key] == 1
mock_file.close.assert_not_called()
assert len(cache) == 1

with set_options(warn_for_unclosed_files=False):
del manager2
gc.collect()

assert not ref_counts
assert not cache


def test_file_manager_replace_object() -> None:
opener = mock.Mock()
def test_file_manager_cache_with_pickle(tmpdir) -> None:

path = str(tmpdir.join("testing.txt"))
with open(path, "w") as f:
f.write("data")
cache: dict = {}
ref_counts: dict = {}

manager = CachingFileManager(opener, "filename", cache=cache, ref_counts=ref_counts)
manager.acquire()
assert ref_counts[manager._key] == 1
assert cache
with mock.patch("xarray.backends.file_manager.FILE_CACHE", cache):
assert not cache

manager = CachingFileManager(opener, "filename", cache=cache, ref_counts=ref_counts)
assert ref_counts[manager._key] == 1
assert cache
manager = CachingFileManager(open, path, mode="r")
manager.acquire()
assert len(cache) == 1

manager.close()
manager2 = pickle.loads(pickle.dumps(manager))
manager2.acquire()
assert len(cache) == 1

with set_options(warn_for_unclosed_files=False):
del manager
gc.collect()
# assert len(cache) == 1

with set_options(warn_for_unclosed_files=False):
del manager2
gc.collect()
assert not cache


def test_file_manager_write_consecutive(tmpdir, file_cache) -> None:
Expand Down

0 comments on commit 89216ee

Please sign in to comment.