Skip to content

Commit

Permalink
Add Job.cached_statepoint for fast read access to state point (#975)
Browse files Browse the repository at this point in the history
* Make additional use of the statepoint cache.

_StatePointDict takes significant time to initialize, even when the statepoint dict is
known.

Adjust `Job` initialization to make more use of the statepoint cache and initialize
`_StatePointDict` only when `Job.statepoint` is accessed. Provide a faster path for
cached *read* access to the statepoint dict via the new property `Job.statepoint_dict`.

One side effect of this change is that some warnings are now deferred to `statepoint`
access that were previously issued during `Job.__init__` (see changes in tests/).

There are additional opportunities to use the cached statepoint dict in
`Project.groupby` that this commit does not address.

* Make JobsCursor.__len__ and .__contains__ O(1).

Cache the ids matching the job filter. This enables O(1) cost for __len__ and
__contains__ as users would expect. In some use-cases, signac-flow repeatedly calls
__contains__ on a JobsCursor.

The side effect of this change is that modifications to the workspace will not be
reflected in existing JobsCursor instances. This behavior was not previously documented
in the user API.

* Add validate_statepoint argument to Job.init()

`with job`, `Job.document`, and `Job.stores` call `init()` because they require that the
job directory exists. Prior to this change, `init()` also forced a load of the
`_StatepointDict`. These methods now call `init(validate_statepoints=False)` which exits
early when the job directory exists.

This change provides a reasonable performance boost (5x on NVME, more on network
storage). There may be more room for improvement as there are currently 2N stat calls
in this use-case:
```python
for job in project:
    with job:
        pass
```

* Rename statepoint_dict to statepoint_mapping.

deepcopy is unexpectedly expensive. Refactor the earlier commit to deepcopy only
user-provided statepoint dicts. Statepoints from the cache are passed to the user
read-only via MappingProxyType.

* Read the cache from disk in `open_job`.

`open_job` uses the statepoint cache to improve performance. Read the
cache from disk in `open_job` (if it has not already been read). This provides
consistently high performance in cases where `open_job` is called before any other
method that may have triggered `_get_statepoint`.

* Restore cache miss logger level to debug.

Users may find the messages to verbose. At the same time, users might never realize
that they should run `signac update-cache` without this message...

* Instantiate Job by id directly when iterating over ids.

`open_job` is a user-facing function and performs error checking on the id. This check
involves a stat call to verify the job directory exists. When `Project` is looping over
ids from `_get_job_ids`, the directory is known to exist (subject to race conditions).
`stat` calls are expensive, especially on networked filesystems.

Instantiating `Job` directly bypasses this check in `open_job`.

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Add statepoint_mapping test.

* Pass information about the Job directories existence from Project to Job.

This allows Job to avoid some `stat` calls which greatly improves performance on
networked file systems.

* Populate _statepoint_mapping in additional code paths.

These missed opportunities to pre-populate _statepoint_mapping triggered slow code
paths.

* Increase test coverage.

* Update change log.

* Rename statepoint_mapping to cached_statepoint.

Also attempt to fix sphinx-doc links.

* Doc fixes.

* Update code comments

* Use cached_statepoint in to_dataframe.

* Restore iteration order.

Python set has a randomized iteration order. Preserve the original iteration
order with a list and converto to set only for __contains__ checks.

* Validate cached_statpoing when read from disk.

This has the added benefit of validating all statepoints that are
added to the cache. I needed to add a validate argument to update_cache
because one of the unit tests relies on adding invalid statepoints
to the cache.

* Use cached_statepoint in groupby.

* Remove validate argument from update_cache.

Locally catch the JobsCorruptedError and ignore it in the test that needs to.

* Write state point as two words in doc strings

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Corwin Kerr <cbkerr@umich.edu>
  • Loading branch information
3 people authored Feb 13, 2024
1 parent 5a82c4c commit 2d6db63
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 64 deletions.
5 changes: 4 additions & 1 deletion changelog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,23 @@ The **signac** package follows `semantic versioning <https://semver.org/>`_.
Version 2
=========

[2.2.0] -- 2023-xx-xx
[2.2.0] -- 2024-xx-xx
---------------------

Added
+++++

- Official support for Python 3.12 (#957).
- ``Job.cached_statepoint`` - cached and read only access to job state points. Faster than
``Job.statepoint`` (#975).

Changed
+++++++

- Restrict allowable tar file features in Python 3.12 (#957).
- linked views now can contain spaces and other characters except directory separators (#926).
- linked views now can be created on Windows, if 'Developer mode' is enabled (#430).
- Increase performance for many usage patterns (#975).

Fixed
+++++
Expand Down
1 change: 1 addition & 0 deletions doc/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ The Job class

.. autosummary::

Job.cached_statepoint
Job.clear
Job.close
Job.data
Expand Down
110 changes: 89 additions & 21 deletions signac/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import shutil
from copy import deepcopy
from threading import RLock
from types import MappingProxyType
from typing import FrozenSet

from synced_collections.backends.collection_json import (
Expand Down Expand Up @@ -248,7 +249,8 @@ class Job:
Jobs can be opened by ``statepoint`` or ``id_``. If both values are
provided, it is the user's responsibility to ensure that the values
correspond.
correspond. Set ``directory_known`` to ``True`` when the job directory
is known to exist - this skips some expensive isdir checks.
Parameters
----------
Expand All @@ -258,6 +260,8 @@ class Job:
State point for the job. (Default value = None)
id_ : str, optional
The job identifier. (Default value = None)
directory_known : bool, optional
Set to true when the job directory is known to exist. (Default value = False)
"""

Expand All @@ -274,30 +278,34 @@ class Job:
KEY_DATA = "signac_data"
"The job's datastore key."

def __init__(self, project, statepoint=None, id_=None):
def __init__(self, project, statepoint=None, id_=None, directory_known=False):
self._project = project
self._lock = RLock()
self._initialize_lazy_properties()
self._directory_known = directory_known

if statepoint is None and id_ is None:
raise ValueError("Either statepoint or id_ must be provided.")
elif statepoint is not None:
self._statepoint_requires_init = False
try:
self._id = calc_id(statepoint) if id_ is None else id_
except TypeError:
raise KeyTypeError
self._statepoint = _StatePointDict(
jobs=[self], filename=self._statepoint_filename, data=statepoint
)

# Update the project's state point cache immediately if opened by state point
self._project._register(self.id, statepoint)
self._cached_statepoint = statepoint
self._statepoint_requires_init = True
else:
# Only an id was provided. State point will be loaded lazily.
self._id = id_
self._statepoint_requires_init = True

# Fetch the cached statepoint from the project's cache. Don't load it
# from disk on a cache miss (will be loaded on demand).
try:
self._cached_statepoint = project._sp_cache[id_]
except KeyError:
self._cached_statepoint = None

def _initialize_lazy_properties(self):
"""Initialize all properties that are designed to be loaded lazily."""
with self._lock:
Expand Down Expand Up @@ -334,7 +342,7 @@ def __str__(self):

def __repr__(self):
return "{}(project={}, statepoint={})".format(
self.__class__.__name__, repr(self._project), self.statepoint
self.__class__.__name__, repr(self._project), self.cached_statepoint
)

@property
Expand Down Expand Up @@ -406,6 +414,33 @@ def update_statepoint(self, update, overwrite=False):
statepoint.update(update)
self.statepoint = statepoint

@property
def cached_statepoint(self):
"""Get a copy of the job's state point as a read-only mapping.
:py:attr:`cached_statepoint` uses the state point cache to provide fast access to
the job's state point for reading.
.. note::
Create and update the state point cache by calling
:py:meth:`project.update_cache <signac.Project.update_cache>`
or running ``signac update-cache`` on the command line.
.. seealso::
Use :py:attr:`statepoint` to modify the job's state point.
Returns
-------
Mapping
Returns the job's state point.
"""
if self._cached_statepoint is None:
self._cached_statepoint = self._project._get_statepoint(self._id)

return MappingProxyType(self._cached_statepoint)

@property
def statepoint(self):
"""Get or set the job's state point.
Expand All @@ -416,6 +451,11 @@ def statepoint(self):
`Modifying the State Point
<https://docs.signac.io/en/latest/jobs.html#modifying-the-state-point>`_.
.. tip::
Use :py:attr:`cached_statepoint` for fast read-only access to the
state point.
.. warning::
The state point object behaves like a dictionary in most cases,
Expand Down Expand Up @@ -443,14 +483,25 @@ def statepoint(self):
"""
with self._lock:
if self._statepoint_requires_init:
# Load state point data lazily (on access).
self._statepoint = _StatePointDict(
jobs=[self], filename=self._statepoint_filename
)
statepoint = self._statepoint.load(self.id)
if self._cached_statepoint is None:
# Load state point data lazily (on access).
self._statepoint = _StatePointDict(
jobs=[self],
filename=self._statepoint_filename,
)
statepoint = self._statepoint.load(self.id)

# Update the project's state point cache when loaded lazily
self._project._register(self.id, statepoint)
self._cached_statepoint = statepoint
else:
# Create _StatePointDict lazily with a known statepoint dict.
self._statepoint = _StatePointDict(
jobs=[self],
filename=self._statepoint_filename,
data=self._cached_statepoint,
)

# Update the project's state point cache when loaded lazily
self._project._register(self.id, statepoint)
self._statepoint_requires_init = False

return self._statepoint
Expand Down Expand Up @@ -510,7 +561,7 @@ def document(self):
"""
with self._lock:
if self._document is None:
self.init()
self.init(validate_statepoint=False)
fn_doc = os.path.join(self.path, self.FN_DOCUMENT)
self._document = BufferedJSONAttrDict(
filename=fn_doc, write_concern=True
Expand Down Expand Up @@ -591,9 +642,9 @@ def stores(self):
"""
with self._lock:
if self._stores is None:
self.init()
self.init(validate_statepoint=False)
self._stores = H5StoreManager(self.path)
return self.init()._stores
return self._stores

@property
def data(self):
Expand Down Expand Up @@ -640,7 +691,7 @@ def project(self):
"""
return self._project

def init(self, force=False):
def init(self, force=False, validate_statepoint=True):
"""Initialize the job's workspace directory.
This function will do nothing if the directory and the job state point
Expand All @@ -656,6 +707,10 @@ def init(self, force=False):
Overwrite any existing state point files, e.g., to repair them if
they got corrupted (Default value = False).
validate_statepoint : bool, optional
When True (the default), load the job state point and ensure that it matches
the id. When False, exit early when the job directory exists.
Returns
-------
Job
Expand All @@ -671,6 +726,15 @@ def init(self, force=False):
"""
with self._lock:
try:
# Fast early exit when not validating.
if not validate_statepoint:
if self._directory_known:
return self

if os.path.isdir(self.path):
self._directory_known = True
return self

# Attempt early exit if the state point file exists and is valid.
try:
statepoint = self.statepoint.load(self.id)
Expand All @@ -687,6 +751,8 @@ def init(self, force=False):
)
raise

self._directory_known = True

# The state point save will not overwrite an existing file on
# disk unless force is True, so the subsequent load will catch
# when a preexisting invalid file was present.
Expand Down Expand Up @@ -760,6 +826,8 @@ def remove(self):
self._document = None
self._stores = None

self._directory_known = False

def move(self, project):
"""Move this job to project.
Expand Down Expand Up @@ -899,7 +967,7 @@ def open(self):
"""
self._cwd.append(os.getcwd())
self.init()
self.init(validate_statepoint=False)
logger.info(f"Enter workspace '{self.path}'.")
os.chdir(self.path)

Expand Down
Loading

0 comments on commit 2d6db63

Please sign in to comment.