Skip to content

Commit

Permalink
tests for TasksOpenDataStore
Browse files Browse the repository at this point in the history
  • Loading branch information
kbuma committed Apr 17, 2024
1 parent a7e6d50 commit b3b059c
Showing 1 changed file with 56 additions and 2 deletions.
58 changes: 56 additions & 2 deletions tests/stores/test_open_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from bson import json_util
from moto import mock_s3

from maggma.stores.open_data import OpenDataStore, PandasMemoryStore, S3IndexStore
from maggma.stores.open_data import OpenDataStore, PandasMemoryStore, S3IndexStore, TasksOpenDataStore

pd.set_option("future.no_silent_downcasting", True)

Expand Down Expand Up @@ -474,7 +474,6 @@ def test_rebuild_index_from_s3_data(s3store):
data = [
{"task_id": "mp-99", "data": "asd", s3store.last_updated_field: datetime.utcnow(), "group": {"level_two": 4}}
]
client = boto3.client("s3", region_name="us-east-1")
string_io2 = StringIO()
with jsonlines.Writer(string_io2, dumps=json_util.dumps) as writer:
for _, row in pd.DataFrame(data).iterrows():
Expand Down Expand Up @@ -557,3 +556,58 @@ def test_additional_metadata(s3store):

with pytest.raises(TypeError):
s3store.update(data, key="a", additional_metadata="task_id")


def test_rebuild_index_from_s3_for_tasks_store():
data = [{"task_id": "mp-2", "data": "asd", "last_updated": datetime.utcnow(), "group": {"level_two": 4}}]
with mock_s3():
conn = boto3.resource("s3", region_name="us-east-1")
conn.create_bucket(Bucket="bucket1")

string_io = StringIO()
with jsonlines.Writer(string_io, dumps=json_util.dumps) as writer:
for _, row in pd.DataFrame(data).iterrows():
writer.write(row.to_dict())

client = boto3.client("s3", region_name="us-east-1")
client.put_object(
Bucket="bucket1",
Body=BytesIO(gzip.compress(string_io.getvalue().encode("utf-8"))),
Key="group_level_two=4/dt=some_random_data.jsonl.gz",
)

store = TasksOpenDataStore(
collection_name="index", bucket="bucket1", key="task_id", object_grouping=["group_level_two", "dt"]
)
store.connect()

index_docs = store.rebuild_index_from_s3_data()
assert len(index_docs) == 1
assert len(store.index.index_data) == 1
for key in index_docs.columns:
assert key == "task_id" or key == "last_updated" or key == "group_level_two" or key == "dt"
assert index_docs["dt"].iloc[0] == "some_random_data"


def test_no_update_for_tasks_store():
with mock_s3():
conn = boto3.resource("s3", region_name="us-east-1")
conn.create_bucket(Bucket="bucket1")
store = TasksOpenDataStore(
collection_name="index", bucket="bucket1", key="task_id", object_grouping=["group_level_two", "dt"]
)
store.connect()

with pytest.raises(NotImplementedError):
store.update(
pd.DataFrame(
[
{
"task_id": "mp-199999",
"data": "foo",
"group": {"level_two": 4},
store.last_updated_field: datetime.utcnow(),
}
]
)
)

0 comments on commit b3b059c

Please sign in to comment.