Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add data to dataset via object with read() method #540

Merged
merged 7 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/lib/core/dmod/core/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.13.0'
__version__ = '0.13.1'
7 changes: 7 additions & 0 deletions python/lib/core/dmod/core/common/reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from typing_extensions import Protocol, runtime_checkable


@runtime_checkable
class Reader(Protocol):
def read(self, size: int = -1, /) -> bytes:
"""EOF if empty b''."""
10 changes: 6 additions & 4 deletions python/lib/core/dmod/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from pydantic.fields import ModelField
from uuid import UUID, uuid4

from .common.reader import Reader


class DatasetType(PydanticEnum):
UNKNOWN = (-1, False, lambda dataset: None)
Expand Down Expand Up @@ -530,7 +532,7 @@ def __init__(self, uuid: Optional[UUID] = None, datasets: Optional[Dict[str, Dat
# TODO: implement functions and routines for scrubbing temporary datasets as needed

@abstractmethod
def add_data(self, dataset_name: str, dest: str, data: Optional[bytes] = None, source: Optional[str] = None,
def add_data(self, dataset_name: str, dest: str, data: Optional[Union[bytes, Reader]] = None, source: Optional[str] = None,
is_temp: bool = False, **kwargs) -> bool:
"""
Add data in some format to the dataset.
Expand All @@ -545,9 +547,9 @@ def add_data(self, dataset_name: str, dest: str, data: Optional[bytes] = None, s
dest : str
A path-like string specifying a location within the dataset (e.g., file, object, sub-URL) where the data
should be added.
data : Optional[bytes]
Optional encoded byte string containing data to be inserted into the data set; either this or ``source``
must be provided.
data : Optional[Union[bytes, Reader]]
Optional encoded byte string _or_ object with read() method returning bytes containing data to be inserted
into the data set; either this or ``source`` must be provided.
source : Optional[str]
Optional string specifying a location from which to source the data to be added; either this or ``data``
must be provided.
Expand Down
2 changes: 1 addition & 1 deletion python/lib/modeldata/dmod/modeldata/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.9.4'
__version__ = '0.9.5'
41 changes: 28 additions & 13 deletions python/lib/modeldata/dmod/modeldata/data/object_store_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@

from dmod.core.meta_data import DataCategory, DataDomain
from dmod.core.dataset import Dataset, DatasetManager, DatasetType, InitialDataAdder
from dmod.core.common.reader import Reader
from datetime import datetime, timedelta
from minio import Minio
from minio.api import ObjectWriteResult
from minio.deleteobjects import DeleteObject
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple
from typing import Dict, List, Optional, Set, Tuple, Union
from uuid import UUID


Expand Down Expand Up @@ -173,14 +174,14 @@ def _push_files(self, bucket_name: str, dir_path: Path, recursive: bool = True,
self.persist_serialized(bucket_name)

# TODO: update to also make adjustments to the domain appropriately when data changes (deleting data also)
def add_data(self, dataset_name: str, dest: str, data: Optional[bytes] = None, source: Optional[str] = None,
def add_data(self, dataset_name: str, dest: str, data: Optional[Union[bytes, Reader]] = None, source: Optional[str] = None,
is_temp: bool = False, **kwargs) -> bool:
"""
Add raw data or data from one or more files to the object store for the given dataset.

Function adds either a binary data, data from a single file, or data from all files within a supplied directory,
to the backing object store of the given dataset. The dataset name must be recognized; if it is not, ``False``
is immediately returned.
Function adds either a binary data, data from a single file, data from an object with a read() method that
returns bytes, or data from all files within a supplied directory, to the backing object store of the given
dataset. The dataset name must be recognized; if it is not, ``False`` is immediately returned.

Binary data must be added to a specified object, supplied by ``dest``. A single ``source`` file may be pushed
either to an explicitly named ``dest`` object, or to an object with a name derived from a ``bucket_root`` as
Expand Down Expand Up @@ -215,9 +216,9 @@ def add_data(self, dataset_name: str, dest: str, data: Optional[bytes] = None, s
A path-like string that provides information on the location within the dataset where the data should be
added when either adding byte string data from ``data`` or when adding from a single file specified in
``source`` (ignored when adding from files within a ``source`` directory).
data : Optional[bytes]
Optional encoded byte string containing data to be inserted into the data set; either this or ``source``
must be provided.
data : Optional[Union[bytes, Reader]]
Optional encoded byte string _or_ object with read() method returning bytes containing data to be inserted
into the data set; either this or ``source`` must be provided.
source : Optional[str]
Optional string specifying either a source file containing data to be added, or a directory containing
multiple files to be added.
Expand Down Expand Up @@ -249,8 +250,15 @@ def add_data(self, dataset_name: str, dest: str, data: Optional[bytes] = None, s
retain_until_date=datetime.now() + timedelta(hours=1))
else:
retention = None
result = self._client.put_object(bucket_name=dataset_name, data=io.BytesIO(data), length=len(data),
object_name=dest, retention=retention)
if isinstance(data, Reader):
# Use AWS S3 default part size of 5MiB
# https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
part_size = 5 * 1024 * 1024
robertbartel marked this conversation as resolved.
Show resolved Hide resolved
result = self._client.put_object(bucket_name=dataset_name, data=data, length=-1, part_size=part_size, object_name=dest,
retention=retention)
else:
result = self._client.put_object(bucket_name=dataset_name, data=io.BytesIO(data), length=len(data),
object_name=dest, retention=retention)
# TODO: do something more intelligent than this for determining success
return result.bucket_name == dataset_name
elif is_temp:
Expand Down Expand Up @@ -328,9 +336,16 @@ def create(self, name: str, category: DataCategory, domain: DataDomain, is_read_
raise e
created_on = datetime.now()
access_loc = "{}://{}/{}".format('https' if self._secure_connection else 'http', self._obj_store_host_str, name)
dataset = Dataset(name=name, category=category, data_domain=domain, dataset_type=DatasetType.OBJECT_STORE,
manager=self, access_location=access_loc, is_read_only=is_read_only, created_on=created_on,
last_updated=created_on, expires_on=expires_on)
try:
dataset = Dataset(name=name, category=category, data_domain=domain, dataset_type=DatasetType.OBJECT_STORE,
manager=self, access_location=access_loc, is_read_only=is_read_only, created_on=created_on,
last_updated=created_on, expires_on=expires_on)
except Exception as e:
# if there is an issue creating the dataset, then remove the bucket
self._client.remove_bucket(name)
# TODO: may need to log something here
self.errors.append(e)
raise e

# Once dataset is added to ``datasets``, it's "managed," so calls to add_data, delete, etc., should work
self.datasets[name] = dataset
Expand Down
39 changes: 39 additions & 0 deletions python/lib/modeldata/dmod/test/it_object_store_dataset_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,45 @@ def test_create_1_b(self):
result = self.minio_client.get_object(bucket_name=dataset_name, object_name=serial_file_name)
self.assertIsNotNone(result)

def test_create_1_c(self):
"""
Test that create does not create a new dataset if a dataset already exists with that name.
"""
ex_num = 1
dataset_name = self.examples[ex_num]['name']

self.assertFalse(self.minio_client.bucket_exists(dataset_name))
self.manager.create(**self.examples[ex_num])
self.assertTrue(self.minio_client.bucket_exists(dataset_name))

with self.assertRaises(Exception):
self.manager.create(**self.examples[ex_num])

does_exist = self.minio_client.bucket_exists(dataset_name)
if does_exist:
self._datasets_to_cleanup.add(dataset_name)

self.assertTrue(does_exist)

def test_create_fails_and_does_not_create_bucket_with_bad_domain(self):
"""
Test that create does not create a dataset when provided a domain that cannot be used to create a Dataset
object.
"""
dataset_name = "test-ds-1"
invalid_domain = object()

self.assertFalse(self.minio_client.bucket_exists(dataset_name))
with self.assertRaises(Exception):
self.manager.create(name=dataset_name, domain=invalid_domain, category=None, is_read_only=False)
self.assertFalse(self.minio_client.bucket_exists(dataset_name))

does_exist = self.minio_client.bucket_exists(dataset_name)
if does_exist:
self._datasets_to_cleanup.add(dataset_name)

self.assertFalse(does_exist)

def test_get_data_1_a(self):
""" Test that we can get the serialized file for a newly created dataset. """
ex_num = 1
Expand Down
2 changes: 1 addition & 1 deletion python/lib/modeldata/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"fiona",
"geopandas",
"dmod-communication>=0.4.2",
"dmod-core>=0.9.0",
"dmod-core>=0.13.1",
"minio",
"aiohttp~=3.8",
"shapely>=2.0.0",
Expand Down
Loading