Skip to content

Commit

Permalink
Merge pull request #814 from materialsproject/bugfix/store_close
Browse files Browse the repository at this point in the history
Add explicit store close to resources
  • Loading branch information
Jason Munro authored Jul 6, 2023
2 parents c0b52b1 + 36434d7 commit 4675761
Show file tree
Hide file tree
Showing 55 changed files with 405 additions and 320 deletions.
22 changes: 3 additions & 19 deletions .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,11 @@ jobs:

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install -r requirements-testing.txt
- name: Lint with pycodestyle
run: |
pip install pycodestyle
pycodestyle src/maggma
- name: Lint with mypy
run: |
pip install mypy mypy-boto3
mypy src/maggma
pip install pre-commit
- name: Lint with flake8
- name: Run pre-commit
run: |
pip install flake8
# stop the build if there are Python syntax errors or undefined names
flake8 --count --show-source --statistics src/maggma
# exit-zero treats all errors as warnings.
flake8 --count --exit-zero --max-complexity=20 --statistics src/maggma
pre-commit run --all-files --show-diff-on-failure
test:
needs: lint
Expand Down
9 changes: 0 additions & 9 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,7 @@ repos:
hooks:
- id: check-case-conflict
- id: check-symlinks
- id: check-yaml
- id: destroyed-symlinks
- id: end-of-file-fixer
- id: mixed-line-ending
- id: trailing-whitespace

- repo: https://github.com/codespell-project/codespell
rev: v2.2.4
hooks:
- id: codespell
stages: [commit, commit-msg]
exclude_types: [json, bib, svg]
args: [--ignore-words-list, "mater,fwe,te"]
3 changes: 1 addition & 2 deletions docs/concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ modular data pipelines. Data resides in one or more `Store` and is processed by
`Builder`. The results of the processing are saved in another `Store`, and so on:

```mermaid
flowchart LR
flowchart LR
    s1(Store 1) --Builder 1--> s2(Store 2) --Builder 2--> s3(Store 3)
s2 -- Builder 3-->s4(Store 4)
```
Expand Down Expand Up @@ -35,4 +35,3 @@ Both `get_items` and `update_targets` can perform IO (input/output) to the data
Another challenge in building complex data-transformation codes is keeping track of all the settings necessary to make some output database. One bad solution is to hard-code these settings, but then any modification is difficult to keep track of.

Maggma solves this by putting the configuration with the pipeline definition in JSON or YAML files. This is done using the `MSONable` pattern, which requires that any Maggma object (the databases and transformation steps) can convert itself to a python dictionary with it's configuration parameters in a process called serialization. These dictionaries can then be converted back to the origianl Maggma object without having to know what class it belonged. `MSONable` does this by injecting in `@class` and `@module` keys that tell it where to find the original python code for that Maggma object.

6 changes: 3 additions & 3 deletions docs/getting_started/running_builders.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,12 @@ A basic invocation of the memory profiler using the `mrun` command line tool wou
mrun --memray on my_builder.json
```

The profiler will generate two files after the builder finishes:
1. An output `.bin` file that is dumped by default into the `temp` directory, which is platform/OS dependent. For Linux/MacOS this will be `/tmp/` and for Windows the target directory will be `C:\TEMP\`.The output file will have a generic naming pattern as follows: `BUILDER_NAME_PASSED_TO_MRUN + BUILDER_START_DATETIME_ISO.bin`, e.g., `my_builder.json_2023-06-09T13:57:48.446361.bin`.
The profiler will generate two files after the builder finishes:
1. An output `.bin` file that is dumped by default into the `temp` directory, which is platform/OS dependent. For Linux/MacOS this will be `/tmp/` and for Windows the target directory will be `C:\TEMP\`.The output file will have a generic naming pattern as follows: `BUILDER_NAME_PASSED_TO_MRUN + BUILDER_START_DATETIME_ISO.bin`, e.g., `my_builder.json_2023-06-09T13:57:48.446361.bin`.
2. A `.html` flamegraph file that will be written to the same directory as the `.bin` dump file. The flamegraph will have a naming pattern similar to the following: `memray-flamegraph-my_builder.json_2023-06-09T13:57:48.446361.html`. The flamegraph can be viewed using any web browser.
***Note***: Different platforms/operating systems purge their system's `temp` directory at different intervals. It is recommended to move at least the `.bin` file to a more stable location. The `.bin` file can be used to recreate the flamegraph at anytime using the Memray CLI.
Using the flag `--memray-dir` (`-md`) allows for specifying an output directory for the `.bin` and `.html` files created by the profiler. The provided directory will be created if the directory does not exist, mimicking the `mkdir -p` command.
Further data visualization and transform examples can be found in Memray's documentation ([Memray reporters](https://bloomberg.github.io/memray/live.html)).
Further data visualization and transform examples can be found in Memray's documentation ([Memray reporters](https://bloomberg.github.io/memray/live.html)).
8 changes: 4 additions & 4 deletions docs/getting_started/using_file_store.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ To create a `Filestore`, simply pass the path to the top-level directory that co
```

On `connect()`, `FileStore` iterates through all files in the base directory and
all subdirectories. For each file, it creates dict-like record based on the file's metadata such as name, size, last modification date, etc. These records are kept in
all subdirectories. For each file, it creates dict-like record based on the file's metadata such as name, size, last modification date, etc. These records are kept in
memory using an internal `MemoryStore`. An example record is shown below.

```python
Expand Down Expand Up @@ -119,7 +119,7 @@ for d in docs:
fs.update(docs)
```

The above steps will result in the following contents being added to the .json file. This metadata will be automatically read back in next time you connect to the Store.
The above steps will result in the following contents being added to the .json file. This metadata will be automatically read back in next time you connect to the Store.

```json
[{"path":".../file_store_test/calculation2/input.in",
Expand Down Expand Up @@ -167,7 +167,7 @@ Note that when using any of the above methods, you cannot modify any keys that a

### Orphaned Metadata

In the course of working with `FileStore` you may encounter a situation where there are metadata records stored in the JSON file that no longer match files on disk. This can happen if, for example, you init a `FileStore` and later delete a file, or if you init the store with the default arguments but later restrict the file selection with `max_depth` or `file_filters`.
In the course of working with `FileStore` you may encounter a situation where there are metadata records stored in the JSON file that no longer match files on disk. This can happen if, for example, you init a `FileStore` and later delete a file, or if you init the store with the default arguments but later restrict the file selection with `max_depth` or `file_filters`.

These orphaned metadata records will appear in the `FileStore` with the field `{"orphan": True}`. The goal with this behavior is to preserve all metadata the user may have added and prevent data loss.

Expand Down Expand Up @@ -200,4 +200,4 @@ Keep in mind that `get_items` will return documents like the one shown in (#crea
- etc.

Once you can process data on your disk with a `Builder`, you can send that data
to any kind of `Store` you like - another `FileStore`, a database, etc.
to any kind of `Store` you like - another `FileStore`, a database, etc.
2 changes: 1 addition & 1 deletion requirements-testing.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ mypy-extensions==0.4.3
responses<0.22.0
types-PyYAML==6.0.11
types-setuptools==65.4.0.0
types-python-dateutil==2.8.19.2
types-python-dateutil==2.8.19.2
1 change: 0 additions & 1 deletion src/maggma/api/API.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from monty.json import MSONable
from starlette.responses import RedirectResponse

from maggma import __version__
from maggma.api.resource import Resource


Expand Down
19 changes: 14 additions & 5 deletions src/maggma/api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@ class Meta(BaseModel):
"""

api_version: str = Field(
__version__, description="a string containing the version of the Materials API " "implementation, e.g. v0.9.5",
__version__,
description="a string containing the version of the Materials API "
"implementation, e.g. v0.9.5",
)

time_stamp: datetime = Field(
description="a string containing the date and time at which the query was executed",
default_factory=datetime.utcnow,
)

total_doc: Optional[int] = Field(None, description="the total number of documents available for this query", ge=0)
total_doc: Optional[int] = Field(
None, description="the total number of documents available for this query", ge=0
)

class Config:
extra = "allow"
Expand All @@ -52,7 +56,9 @@ class Response(GenericModel, Generic[DataT]):
"""

data: Optional[List[DataT]] = Field(None, description="List of returned data")
errors: Optional[List[Error]] = Field(None, description="Any errors on processing this query")
errors: Optional[List[Error]] = Field(
None, description="Any errors on processing this query"
)
meta: Optional[Meta] = Field(None, description="Extra information for the query")

@validator("errors", always=True)
Expand Down Expand Up @@ -82,9 +88,12 @@ class S3URLDoc(BaseModel):
"""

url: str = Field(
..., description="Pre-signed download URL",
...,
description="Pre-signed download URL",
)

requested_datetime: datetime = Field(..., description="Datetime for when URL was requested")
requested_datetime: datetime = Field(
..., description="Datetime for when URL was requested"
)

expiry_datetime: datetime = Field(..., description="Expiry datetime of the URL")
10 changes: 10 additions & 0 deletions src/maggma/api/query_operator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,13 @@
from maggma.api.query_operator.sorting import SortQuery
from maggma.api.query_operator.sparse_fields import SparseFieldsQuery
from maggma.api.query_operator.submission import SubmissionQuery

__all__ = [
"QueryOperator",
"NumericQuery",
"StringQueryOperator",
"PaginationQuery",
"SortQuery",
"SparseFieldsQuery",
"SubmissionQuery",
]
3 changes: 0 additions & 3 deletions src/maggma/api/query_operator/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def __init__(
fields: Optional[List[str]] = None,
excluded_fields: Optional[List[str]] = None,
):

self.model = model
self.fields = fields
self.excluded_fields = excluded_fields
Expand Down Expand Up @@ -119,7 +118,6 @@ class NumericQuery(DynamicQueryOperator):
def field_to_operator(
self, name: str, field: ModelField
) -> List[Tuple[str, Any, Query, Callable[..., Dict]]]:

"""
Converts a PyDantic ModelField into a Tuple with the
query_param name,
Expand Down Expand Up @@ -215,7 +213,6 @@ class StringQueryOperator(DynamicQueryOperator):
def field_to_operator(
self, name: str, field: ModelField
) -> List[Tuple[str, Any, Query, Callable[..., Dict]]]:

"""
Converts a PyDantic ModelField into a Tuple with the
query_param name,
Expand Down
4 changes: 2 additions & 2 deletions src/maggma/api/query_operator/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ def query(
f" Limited to {max_limit}.",
),
_skip: int = Query(
0, description="Number of entries to skip in the search.",
0,
description="Number of entries to skip in the search.",
),
_limit: int = Query(
default_limit,
Expand All @@ -43,7 +44,6 @@ def query(
"""

if _page is not None:

if _per_page > max_limit:
raise HTTPException(
status_code=400,
Expand Down
5 changes: 2 additions & 3 deletions src/maggma/api/query_operator/sorting.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Optional, List
from typing import Optional

from fastapi import HTTPException, Query
from fastapi import Query

from maggma.api.query_operator import QueryOperator
from maggma.api.utils import STORE_PARAMS
Expand All @@ -19,7 +19,6 @@ def query(
Prefixing '-' to a field will force a sort in descending order.",
),
) -> STORE_PARAMS:

sort = {}

if _sort_fields:
Expand Down
2 changes: 0 additions & 2 deletions src/maggma/api/query_operator/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ class SubmissionQuery(QueryOperator):
"""

def __init__(self, status_enum):

self.status_enum = status_enum

def query(
Expand All @@ -25,7 +24,6 @@ def query(
description="Minimum datetime of status update for submission",
),
) -> STORE_PARAMS:

crit = {} # type: dict

if state:
Expand Down
12 changes: 12 additions & 0 deletions src/maggma/api/resource/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,15 @@
from maggma.api.resource.read_resource import ReadOnlyResource, attach_query_ops
from maggma.api.resource.submission import SubmissionResource
from maggma.api.resource.s3_url import S3URLResource

__all__ = [
"Resource",
"HintScheme",
"HeaderProcessor",
"AggregationResource",
"PostOnlyResource",
"ReadOnlyResource",
"attach_query_ops",
"SubmissionResource",
"S3URLResource",
]
1 change: 0 additions & 1 deletion src/maggma/api/resource/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ def prepare_endpoint(self):
self.build_dynamic_model_search()

def build_dynamic_model_search(self):

model_name = self.model.__name__

def search(**queries: Dict[str, STORE_PARAMS]) -> Dict:
Expand Down
4 changes: 2 additions & 2 deletions src/maggma/api/resource/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ class Resource(MSONable, metaclass=ABCMeta):
"""

def __init__(
self, model: Type[BaseModel],
self,
model: Type[BaseModel],
):
"""
Args:
Expand Down Expand Up @@ -80,7 +81,6 @@ def as_dict(self) -> Dict:

@classmethod
def from_dict(cls, d: Dict):

if isinstance(d["model"], str):
d["model"] = dynamic_import(d["model"])
d = {k: MontyDecoder().process_decoded(v) for k, v in d.items()}
Expand Down
32 changes: 24 additions & 8 deletions src/maggma/api/resource/post_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,22 +79,27 @@ def prepare_endpoint(self):
self.build_dynamic_model_search()

def build_dynamic_model_search(self):

model_name = self.model.__name__

def search(**queries: Dict[str, STORE_PARAMS]) -> Dict:
request: Request = queries.pop("request") # type: ignore
queries.pop("temp_response") # type: ignore

query_params = [
entry for _, i in enumerate(self.query_operators) for entry in signature(i.query).parameters
entry
for _, i in enumerate(self.query_operators)
for entry in signature(i.query).parameters
]

overlap = [key for key in request.query_params.keys() if key not in query_params]
overlap = [
key for key in request.query_params.keys() if key not in query_params
]
if any(overlap):
raise HTTPException(
status_code=400,
detail="Request contains query parameters which cannot be used: {}".format(", ".join(overlap)),
detail="Request contains query parameters which cannot be used: {}".format(
", ".join(overlap)
),
)

query: Dict[Any, Any] = merge_queries(list(queries.values())) # type: ignore
Expand All @@ -105,18 +110,26 @@ def search(**queries: Dict[str, STORE_PARAMS]) -> Dict:
try:
with query_timeout(self.timeout):
count = self.store.count( # type: ignore
**{field: query[field] for field in query if field in ["criteria", "hint"]}
**{
field: query[field]
for field in query
if field in ["criteria", "hint"]
}
)

if isinstance(self.store, S3Store):
data = list(self.store.query(**query)) # type: ignore
else:

pipeline = generate_query_pipeline(query, self.store)

data = list(
self.store._collection.aggregate(
pipeline, **{field: query[field] for field in query if field in ["hint"]}
pipeline,
**{
field: query[field]
for field in query
if field in ["hint"]
},
)
)
except (NetworkTimeout, PyMongoError) as e:
Expand All @@ -129,7 +142,10 @@ def search(**queries: Dict[str, STORE_PARAMS]) -> Dict:
raise HTTPException(
status_code=500,
detail="Server timed out trying to obtain data. Try again with a smaller request, "
"or remove sorting fields and sort data locally.",)
"or remove sorting fields and sort data locally.",
)

self.store.close()

operator_meta = {}

Expand Down
Loading

0 comments on commit 4675761

Please sign in to comment.