diff --git a/chimerapy/engine/node/node.py b/chimerapy/engine/node/node.py index bd24f91..390b692 100644 --- a/chimerapy/engine/node/node.py +++ b/chimerapy/engine/node/node.py @@ -374,6 +374,43 @@ def save_json(self, name: str, data: Dict[Any, Any]): } self.recorder.submit(json_entry) + def save_text(self, name: str, data: str, suffix="txt"): + """Record text data from the node to a text file. + + Parameters + ---------- + name : str + Name of the text file (.suffix extension will be suffixed). + + data : str + The data to be recorded. + + suffix : str + The suffix of the text file. + + Notes + ----- + It should be noted that new lines addition should be taken by the callee. + """ + + if not self.recorder: + self.logger.warning( + f"{self}: cannot perform recording operation without RecorderService " + "initialization" + ) + return False + + if self.recorder.enabled: + text_entry = { + "uuid": uuid.uuid4(), + "name": name, + "data": data, + "suffix": suffix, + "dtype": "text", + "timestamp": datetime.datetime.now(), + } + self.recorder.submit(text_entry) + #################################################################### ## Back-End Lifecycle API #################################################################### diff --git a/chimerapy/engine/node/record_service.py b/chimerapy/engine/node/record_service.py index 5ae5689..9d32656 100644 --- a/chimerapy/engine/node/record_service.py +++ b/chimerapy/engine/node/record_service.py @@ -14,6 +14,7 @@ TabularRecord, ImageRecord, JSONRecord, + TextRecord, ) from ..service import Service @@ -49,6 +50,7 @@ def __init__( "tabular": TabularRecord, "image": ImageRecord, "json": JSONRecord, + "text": TextRecord, } # Making sure the attribute exists @@ -128,6 +130,8 @@ def run(self): if data_entry["name"] not in self.records: entry_cls = self.record_map[data_entry["dtype"]] entry = entry_cls(dir=self.state.logdir, name=data_entry["name"]) + + # FixMe: Potential overwrite of existing entry? self.records[data_entry["name"]] = entry # Case 2 diff --git a/chimerapy/engine/records/__init__.py b/chimerapy/engine/records/__init__.py index 36e7ff6..12871d9 100644 --- a/chimerapy/engine/records/__init__.py +++ b/chimerapy/engine/records/__init__.py @@ -5,6 +5,7 @@ from .tabular_record import TabularRecord from .video_record import VideoRecord from .json_record import JSONRecord +from .text_record import TextRecord __all__ = [ "Record", @@ -13,4 +14,5 @@ "TabularRecord", "VideoRecord", "JSONRecord", + "TextRecord", ] diff --git a/chimerapy/engine/records/text_record.py b/chimerapy/engine/records/text_record.py new file mode 100644 index 0000000..afa8eaf --- /dev/null +++ b/chimerapy/engine/records/text_record.py @@ -0,0 +1,46 @@ +# Built-in Imports +from typing import Dict, Any, Optional, IO +import pathlib + +# Third-party Imports + +# Internal Import +from .record import Record + + +class TextRecord(Record): + def __init__( + self, + dir: pathlib.Path, + name: str, + ): + """Construct a text file Record. + + Args: + dir (pathlib.Path): The directory to store the snap shots of data. + name (str): The name of the ``Record``. + suffix (str): The suffix of the text file. Defaults to "txt". + """ + super().__init__() + + # Saving the Record attributes + self.dir = dir + self.name = name + self.first_frame = False + self.file_handler: Optional[IO[str]] = None + + def write(self, data_chunk: Dict[str, Any]): + if not self.first_frame: + self.file_handler = (self.dir / f"{self.name}.{data_chunk['suffix']}").open( + "w" + ) + self.first_frame = True + + text_data = data_chunk["data"] + assert self.file_handler is not None + self.file_handler.write(text_data) + + def close(self): + if self.file_handler is not None: + self.file_handler.close() + self.file_handler = None diff --git a/test/streams/data_nodes.py b/test/streams/data_nodes.py index 7a54a0d..bb051dd 100644 --- a/test/streams/data_nodes.py +++ b/test/streams/data_nodes.py @@ -1,5 +1,6 @@ # Build-in Imports import time +import random # Third-party Imports import pyaudio @@ -75,3 +76,18 @@ def step(self): time.sleep(1 / 10) data = {"time": time.time(), "content": "HELLO"} self.save_json(name="test", data=data) + + +class TextNode(cpe.Node): + def setup(self): + self.step_count = 0 + + def step(self): + time.sleep(1 / 10) + num_lines = random.randint(1, 5) + self.step_count += 1 + lines = [] + for j in range(num_lines): + lines.append(f"This is a test - Step Count - {self.step_count + 1}\n") + + self.save_text(name="test", data="".join(lines), suffix="text") diff --git a/test/streams/test_text.py b/test/streams/test_text.py new file mode 100644 index 0000000..c138925 --- /dev/null +++ b/test/streams/test_text.py @@ -0,0 +1,102 @@ +from .data_nodes import TextNode + +# Built-in Imports +import os +import pathlib +import time +import uuid + +# Third-party +import pytest + +# Internal Imports +import chimerapy.engine as cpe +from chimerapy.engine.records.text_record import TextRecord +from chimerapy.engine.networking.async_loop_thread import AsyncLoopThread +from chimerapy.engine.eventbus import EventBus, Event + +logger = cpe._logger.getLogger("chimerapy-engine") + +# Constants +CWD = pathlib.Path(os.path.abspath(__file__)).parent.parent +TEST_DATA_DIR = CWD / "data" + + +@pytest.fixture +def text_node(): + + # Create a node + text_n = TextNode(name="text_n", logdir=TEST_DATA_DIR) + + return text_n + + +def test_text_record(): + + # Check that the image was created + expected_text_path = TEST_DATA_DIR / "test-5.log" + try: + os.rmdir(expected_text_path.parent) + except OSError: + ... + + # Create the record + text_r = TextRecord(dir=TEST_DATA_DIR, name="test-5") + + data = [ + "Lorem ipsum dolor sit amet, consectetur adipiscing elit, " + "sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.\n", + "Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi " + "ut aliquip ex ea commodo consequat.\n", + ] + + # Write to image file + for i in range(5): + print("\n".join(data)) + text_chunk = { + "uuid": uuid.uuid4(), + "name": "test-5", + "suffix": "log", + "data": "".join(data), + "dtype": "text", + } + text_r.write(text_chunk) + + # Check that the image was created + assert expected_text_path.exists() + + with expected_text_path.open("r") as jlf: + for idx, line in enumerate(jlf): + assert line.strip() == (data[idx % len(data)]).strip() + + +def test_node_save_text_stream(text_node): + + # Event Loop + thread = AsyncLoopThread() + thread.start() + eventbus = EventBus(thread=thread) + + # Check that the image was created + expected_text_path = pathlib.Path(text_node.state.logdir) / "test.text" + try: + os.rmdir(expected_text_path.parent) + except OSError: + ... + + # Stream + text_node.run(blocking=False, eventbus=eventbus) + + # Wait to generate files + eventbus.send(Event("start")).result() + logger.debug("Finish start") + eventbus.send(Event("record")).result() + logger.debug("Finish record") + time.sleep(3) + eventbus.send(Event("stop")).result() + logger.debug("Finish stop") + + text_node.shutdown() + + # Check that the image was created + assert expected_text_path.exists()