-
Notifications
You must be signed in to change notification settings - Fork 875
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(framework:skip) Add Flower File Storage interface and disk based…
… implementation
- Loading branch information
1 parent
b658ba7
commit 2d3398a
Showing
4 changed files
with
360 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
# Copyright 2024 Flower Labs GmbH. All Rights Reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# ============================================================================== | ||
"""Flower File Storage for large objects.""" | ||
|
||
|
||
from .disk_ffs import DiskFfs as DiskFfs | ||
from .ffs import Ffs as Ffs | ||
|
||
__all__ = [ | ||
"Ffs", | ||
"DiskFfs", | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
# Copyright 2024 Flower Labs GmbH. All Rights Reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# ============================================================================== | ||
"""Disk based Flower File Storage.""" | ||
|
||
import hashlib | ||
import json | ||
import os | ||
from typing import Dict, List, Tuple | ||
|
||
from flwr.server.superlink.ffs.ffs import Ffs | ||
|
||
|
||
def write_dict_to_file(data_dict, file_path): | ||
"""Write a Dict to a file in JSON format.""" | ||
with open(file_path, "w") as file: | ||
json.dump(data_dict, file) | ||
|
||
|
||
def read_dict_from_file(file_path): | ||
"""Read a Dict from a JSON file.""" | ||
with open(file_path) as file: | ||
data_dict = json.load(file) | ||
return data_dict | ||
|
||
|
||
class DiskFfs(Ffs): # pylint: disable=R0904 | ||
"""Disk based Flower File Storage interface for large objects.""" | ||
|
||
def __init__(self, base_dir: str) -> None: | ||
"""Create a new DiskFfs instance. | ||
Parameters | ||
---------- | ||
base_dir : string | ||
The base directory to store the objects. | ||
""" | ||
self.base_dir = base_dir | ||
|
||
def put(self, content: bytes, meta: Dict[str, str]) -> str: | ||
"""Store bytes and metadata. Return sha256hex hash of data as str. | ||
Parameters | ||
---------- | ||
content : bytes | ||
The content to be stored. | ||
meta : Dict[str, str] | ||
The metadata to be stored. | ||
Returns | ||
------- | ||
sha256hex : string | ||
The sha256hex hash of the content. | ||
""" | ||
content_hash = hashlib.sha256(content).hexdigest() | ||
|
||
with open(os.path.join(self.base_dir, content_hash), "wb") as file: | ||
file.write(content) | ||
|
||
write_dict_to_file(meta, os.path.join(self.base_dir, f"{content_hash}.META")) | ||
|
||
return content_hash | ||
|
||
def get(self, key: str) -> Tuple[bytes, Dict[str, str]]: | ||
"""Return tuple containing the object and it's meta fields. | ||
Parameters | ||
---------- | ||
hash : string | ||
The sha256hex hash of the object to be retrieved. | ||
Returns | ||
------- | ||
Tuple[bytes, Dict[str, str]] | ||
A tuple containing the object and it's metadata. | ||
""" | ||
with open(os.path.join(self.base_dir, key), "rb") as file: | ||
content = file.read() | ||
|
||
meta = read_dict_from_file(os.path.join(self.base_dir, f"{key}.META")) | ||
|
||
return content, meta | ||
|
||
def delete(self, key: str) -> None: | ||
"""Delete object with hash. | ||
Parameters | ||
---------- | ||
hash : string | ||
The sha256hex hash of the object to be deleted. | ||
""" | ||
os.remove(os.path.join(self.base_dir, key)) | ||
os.remove(os.path.join(self.base_dir, f"{key}.META")) | ||
|
||
def list(self) -> List[str]: | ||
"""List all keys. | ||
Can be used to list all keys in the storage and e.g. to clean up | ||
the storage with the delete method. | ||
Returns | ||
------- | ||
List[str] | ||
A list of all keys. | ||
""" | ||
return [ | ||
item for item in os.listdir(self.base_dir) if not item.endswith(".META") | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
# Copyright 2024 Flower Labs GmbH. All Rights Reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# ============================================================================== | ||
"""Abstract base class for Flower File Storage interface.""" | ||
|
||
|
||
import abc | ||
from typing import Dict, List, Tuple | ||
|
||
|
||
class Ffs(abc.ABC): # pylint: disable=R0904 | ||
"""Abstract Flower File Storage interface for large objects.""" | ||
|
||
@abc.abstractmethod | ||
def put(self, content: bytes, meta: Dict[str, str]) -> str: | ||
"""Store bytes and metadata. Return sha256hex hash of data as str. | ||
Parameters | ||
---------- | ||
content : bytes | ||
The content to be stored. | ||
meta : Dict[str, str] | ||
The metadata to be stored. | ||
Returns | ||
------- | ||
sha256hex : string | ||
The sha256hex hash of the content. | ||
""" | ||
|
||
@abc.abstractmethod | ||
def get(self, key: str) -> Tuple[bytes, Dict[str, str]]: | ||
"""Return tuple containing the object and it's meta fields. | ||
Parameters | ||
---------- | ||
hash : string | ||
The sha256hex hash of the object to be retrieved. | ||
Returns | ||
------- | ||
Tuple[bytes, Dict[str, str]] | ||
A tuple containing the object and it's metadata. | ||
""" | ||
|
||
@abc.abstractmethod | ||
def delete(self, key: str) -> None: | ||
"""Delete object with hash. | ||
Parameters | ||
---------- | ||
hash : string | ||
The sha256hex hash of the object to be deleted. | ||
""" | ||
|
||
@abc.abstractmethod | ||
def list(self) -> List[str]: | ||
"""List all keys. | ||
Can be used to list all keys in the storage and e.g. to clean up | ||
the storage with the delete method. | ||
Returns | ||
------- | ||
List[str] | ||
A list of all keys. | ||
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
# Copyright 2024 Flower Labs GmbH. All Rights Reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# ============================================================================== | ||
"""Tests all Ffs implemenations have to conform to.""" | ||
# pylint: disable=invalid-name, disable=R0904 | ||
|
||
import hashlib | ||
import json | ||
import os | ||
import tempfile | ||
import unittest | ||
from abc import abstractmethod | ||
|
||
from flwr.server.superlink.ffs import DiskFfs, Ffs | ||
|
||
|
||
class FfsTest(unittest.TestCase): | ||
"""Test all ffs implementations.""" | ||
|
||
# This is to True in each child class | ||
__test__ = False | ||
|
||
@abstractmethod | ||
def ffs_factory(self) -> Ffs: | ||
"""Provide Ffs implementation to test.""" | ||
raise NotImplementedError() | ||
|
||
def test_put(self) -> None: | ||
"""Test if object can be stored.""" | ||
# Prepare | ||
ffs: Ffs = self.ffs_factory() | ||
content = b"content" | ||
hash_expected = hashlib.sha256(content).hexdigest() | ||
|
||
# Execute | ||
hash_actual = ffs.put(b"content", {"meta": "data"}) | ||
|
||
# Assert | ||
assert isinstance(hash_actual, str) | ||
assert len(hash_actual) == 64 | ||
assert hash_actual == hash_expected | ||
|
||
# Check if file was created | ||
assert [hash_expected, f"{hash_expected}.META"] == os.listdir(self.tmp_dir.name) | ||
|
||
def test_get(self) -> None: | ||
"""Test if object can be retrieved.""" | ||
# Prepare | ||
ffs: Ffs = self.ffs_factory() | ||
content_expected = b"content" | ||
hash_expected = hashlib.sha256(content_expected).hexdigest() | ||
meta_expected = {} | ||
|
||
with open(os.path.join(self.tmp_dir.name, hash_expected), "wb") as file: | ||
file.write(content_expected) | ||
|
||
with open( | ||
os.path.join(self.tmp_dir.name, f"{hash_expected}.META"), "w" | ||
) as file: | ||
json.dump(meta_expected, file) | ||
|
||
# Execute | ||
content_actual, meta_actual = ffs.get(hash_expected) | ||
|
||
# Assert | ||
assert content_actual == content_expected | ||
assert meta_actual == meta_expected | ||
|
||
def test_delete(self) -> None: | ||
"""Test if object can be deleted.""" | ||
# Prepare | ||
ffs: Ffs = self.ffs_factory() | ||
content_expected = b"content" | ||
hash_expected = hashlib.sha256(content_expected).hexdigest() | ||
meta_expected = {} | ||
|
||
with open(os.path.join(self.tmp_dir.name, hash_expected), "wb") as file: | ||
file.write(content_expected) | ||
|
||
with open( | ||
os.path.join(self.tmp_dir.name, f"{hash_expected}.META"), "w" | ||
) as file: | ||
json.dump(meta_expected, file) | ||
|
||
# Execute | ||
ffs.delete(hash_expected) | ||
|
||
# Assert | ||
assert [] == os.listdir(self.tmp_dir.name) | ||
|
||
def test_list(self) -> None: | ||
"""Test if object hashes can be listed.""" | ||
# Prepare | ||
ffs: Ffs = self.ffs_factory() | ||
content_expected = b"content" | ||
hash_expected = hashlib.sha256(content_expected).hexdigest() | ||
meta_expected = {} | ||
|
||
with open(os.path.join(self.tmp_dir.name, hash_expected), "wb") as file: | ||
file.write(content_expected) | ||
|
||
with open( | ||
os.path.join(self.tmp_dir.name, f"{hash_expected}.META"), "w" | ||
) as file: | ||
json.dump(meta_expected, file) | ||
|
||
# Execute | ||
hashes = ffs.list() | ||
|
||
# Assert | ||
assert [hash_expected] == hashes | ||
|
||
|
||
class DiskFfsTest(FfsTest, unittest.TestCase): | ||
"""Test DiskFfs implementation.""" | ||
|
||
__test__ = True | ||
|
||
def ffs_factory(self) -> DiskFfs: | ||
"""Return SqliteState with file-based database.""" | ||
# pylint: disable-next=consider-using-with,attribute-defined-outside-init | ||
self.tmp_dir = tempfile.TemporaryDirectory() | ||
ffs = DiskFfs(self.tmp_dir.name) | ||
return ffs | ||
|
||
|
||
if __name__ == "__main__": | ||
unittest.main(verbosity=2) |