From 27a34c9a6d98339ee89189ed9fc334ec7c47dcb6 Mon Sep 17 00:00:00 2001 From: Taner Topal Date: Mon, 12 Aug 2024 16:50:38 -0400 Subject: [PATCH] feat(framework:skip) Add Flower File Storage interface and disk based implementation (#3619) Co-authored-by: Daniel J. Beutel Co-authored-by: Charles Beauville --- src/py/flwr/server/superlink/ffs/__init__.py | 24 +++ src/py/flwr/server/superlink/ffs/disk_ffs.py | 104 +++++++++++++ src/py/flwr/server/superlink/ffs/ffs.py | 79 ++++++++++ src/py/flwr/server/superlink/ffs/ffs_test.py | 150 +++++++++++++++++++ 4 files changed, 357 insertions(+) create mode 100644 src/py/flwr/server/superlink/ffs/__init__.py create mode 100644 src/py/flwr/server/superlink/ffs/disk_ffs.py create mode 100644 src/py/flwr/server/superlink/ffs/ffs.py create mode 100644 src/py/flwr/server/superlink/ffs/ffs_test.py diff --git a/src/py/flwr/server/superlink/ffs/__init__.py b/src/py/flwr/server/superlink/ffs/__init__.py new file mode 100644 index 00000000000..0273d2a630e --- /dev/null +++ b/src/py/flwr/server/superlink/ffs/__init__.py @@ -0,0 +1,24 @@ +# Copyright 2024 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Flower File Storage for large objects.""" + + +from .disk_ffs import DiskFfs as DiskFfs +from .ffs import Ffs as Ffs + +__all__ = [ + "DiskFfs", + "Ffs", +] diff --git a/src/py/flwr/server/superlink/ffs/disk_ffs.py b/src/py/flwr/server/superlink/ffs/disk_ffs.py new file mode 100644 index 00000000000..5331af50046 --- /dev/null +++ b/src/py/flwr/server/superlink/ffs/disk_ffs.py @@ -0,0 +1,104 @@ +# Copyright 2024 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Disk based Flower File Storage.""" + +import hashlib +import json +from pathlib import Path +from typing import Dict, List, Tuple + +from flwr.server.superlink.ffs.ffs import Ffs + + +class DiskFfs(Ffs): # pylint: disable=R0904 + """Disk-based Flower File Storage interface for large objects.""" + + def __init__(self, base_dir: str) -> None: + """Create a new DiskFfs instance. + + Parameters + ---------- + base_dir : str + The base directory to store the objects. + """ + self.base_dir = Path(base_dir) + + def put(self, content: bytes, meta: Dict[str, str]) -> str: + """Store bytes and metadata and return key (hash of content). + + Parameters + ---------- + content : bytes + The content to be stored. + meta : Dict[str, str] + The metadata to be stored. + + Returns + ------- + key : str + The key (sha256hex hash) of the content. + """ + content_hash = hashlib.sha256(content).hexdigest() + + self.base_dir.mkdir(exist_ok=True, parents=True) + (self.base_dir / content_hash).write_bytes(content) + (self.base_dir / f"{content_hash}.META").write_text(json.dumps(meta)) + + return content_hash + + def get(self, key: str) -> Tuple[bytes, Dict[str, str]]: + """Return tuple containing the object content and metadata. + + Parameters + ---------- + key : str + The sha256hex hash of the object to be retrieved. + + Returns + ------- + Tuple[bytes, Dict[str, str]] + A tuple containing the object content and metadata. + """ + content = (self.base_dir / key).read_bytes() + meta = json.loads((self.base_dir / f"{key}.META").read_text()) + + return content, meta + + def delete(self, key: str) -> None: + """Delete object with hash. + + Parameters + ---------- + key : str + The sha256hex hash of the object to be deleted. + """ + (self.base_dir / key).unlink() + (self.base_dir / f"{key}.META").unlink() + + def list(self) -> List[str]: + """List all keys. + + Return all available keys in this `Ffs` instance. + This can be combined with, for example, + the `delete` method to delete objects. + + Returns + ------- + List[str] + A list of all available keys. + """ + return [ + item.name for item in self.base_dir.iterdir() if not item.suffix == ".META" + ] diff --git a/src/py/flwr/server/superlink/ffs/ffs.py b/src/py/flwr/server/superlink/ffs/ffs.py new file mode 100644 index 00000000000..622988141c9 --- /dev/null +++ b/src/py/flwr/server/superlink/ffs/ffs.py @@ -0,0 +1,79 @@ +# Copyright 2024 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Abstract base class for Flower File Storage interface.""" + + +import abc +from typing import Dict, List, Tuple + + +class Ffs(abc.ABC): # pylint: disable=R0904 + """Abstract Flower File Storage interface for large objects.""" + + @abc.abstractmethod + def put(self, content: bytes, meta: Dict[str, str]) -> str: + """Store bytes and metadata and return sha256hex hash of data as str. + + Parameters + ---------- + content : bytes + The content to be stored. + meta : Dict[str, str] + The metadata to be stored. + + Returns + ------- + key : str + The key (sha256hex hash) of the content. + """ + + @abc.abstractmethod + def get(self, key: str) -> Tuple[bytes, Dict[str, str]]: + """Return tuple containing the object content and metadata. + + Parameters + ---------- + key : str + The key (sha256hex hash) of the object to be retrieved. + + Returns + ------- + Tuple[bytes, Dict[str, str]] + A tuple containing the object content and metadata. + """ + + @abc.abstractmethod + def delete(self, key: str) -> None: + """Delete object with hash. + + Parameters + ---------- + key : str + The key (sha256hex hash) of the object to be deleted. + """ + + @abc.abstractmethod + def list(self) -> List[str]: + """List keys of all stored objects. + + Return all available keys in this `Ffs` instance. + This can be combined with, for example, + the `delete` method to delete objects. + + Returns + ------- + List[str] + A list of all available keys. + """ diff --git a/src/py/flwr/server/superlink/ffs/ffs_test.py b/src/py/flwr/server/superlink/ffs/ffs_test.py new file mode 100644 index 00000000000..3b25ac7b206 --- /dev/null +++ b/src/py/flwr/server/superlink/ffs/ffs_test.py @@ -0,0 +1,150 @@ +# Copyright 2024 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Tests all Ffs implemenations have to conform to.""" +# pylint: disable=invalid-name, disable=R0904 + +import hashlib +import json +import os +import tempfile +import unittest +from abc import abstractmethod +from typing import Dict + +from flwr.server.superlink.ffs import DiskFfs, Ffs + + +class FfsTest(unittest.TestCase): + """Test all ffs implementations.""" + + # This is to True in each child class + __test__ = False + + tmp_dir: tempfile.TemporaryDirectory # type: ignore + + @abstractmethod + def ffs_factory(self) -> Ffs: + """Provide Ffs implementation to test.""" + raise NotImplementedError() + + def test_put(self) -> None: + """Test if object can be stored.""" + # Prepare + ffs: Ffs = self.ffs_factory() + content = b"content" + hash_expected = hashlib.sha256(content).hexdigest() + + # Execute + hash_actual = ffs.put(b"content", {"meta": "data"}) + + # Assert + assert isinstance(hash_actual, str) + assert len(hash_actual) == 64 + assert hash_actual == hash_expected + + # Check if file was created + assert {hash_expected, f"{hash_expected}.META"} == set( + os.listdir(self.tmp_dir.name) + ) + + def test_get(self) -> None: + """Test if object can be retrieved.""" + # Prepare + ffs: Ffs = self.ffs_factory() + content_expected = b"content" + hash_expected = hashlib.sha256(content_expected).hexdigest() + meta_expected: Dict[str, str] = {"meta_key": "meta_value"} + + with open(os.path.join(self.tmp_dir.name, hash_expected), "wb") as file: + file.write(content_expected) + + with open( + os.path.join(self.tmp_dir.name, f"{hash_expected}.META"), + "w", + encoding="utf-8", + ) as file: + json.dump(meta_expected, file) + + # Execute + content_actual, meta_actual = ffs.get(hash_expected) + + # Assert + assert content_actual == content_expected + assert meta_actual == meta_expected + + def test_delete(self) -> None: + """Test if object can be deleted.""" + # Prepare + ffs: Ffs = self.ffs_factory() + content_expected = b"content" + hash_expected = hashlib.sha256(content_expected).hexdigest() + meta_expected: Dict[str, str] = {"meta_key": "meta_value"} + + with open(os.path.join(self.tmp_dir.name, hash_expected), "wb") as file: + file.write(content_expected) + + with open( + os.path.join(self.tmp_dir.name, f"{hash_expected}.META"), + "w", + encoding="utf-8", + ) as file: + json.dump(meta_expected, file) + + # Execute + ffs.delete(hash_expected) + + # Assert + assert set() == set(os.listdir(self.tmp_dir.name)) + + def test_list(self) -> None: + """Test if object hashes can be listed.""" + # Prepare + ffs: Ffs = self.ffs_factory() + content_expected = b"content" + hash_expected = hashlib.sha256(content_expected).hexdigest() + meta_expected: Dict[str, str] = {"meta_key": "meta_value"} + + with open(os.path.join(self.tmp_dir.name, hash_expected), "wb") as file: + file.write(content_expected) + + with open( + os.path.join(self.tmp_dir.name, f"{hash_expected}.META"), + "w", + encoding="utf-8", + ) as file: + json.dump(meta_expected, file) + + # Execute + hashes = ffs.list() + + # Assert + assert {hash_expected} == set(hashes) + + +class DiskFfsTest(FfsTest, unittest.TestCase): + """Test DiskFfs implementation.""" + + __test__ = True + + def ffs_factory(self) -> DiskFfs: + """Return SqliteState with file-based database.""" + # pylint: disable-next=consider-using-with,attribute-defined-outside-init + self.tmp_dir = tempfile.TemporaryDirectory() + ffs = DiskFfs(self.tmp_dir.name) + return ffs + + +if __name__ == "__main__": + unittest.main(verbosity=2)