Skip to content

Commit

Permalink
Add text record. Closes #269 (#270)
Browse files Browse the repository at this point in the history
  • Loading branch information
umesh-timalsina authored Sep 21, 2023
1 parent 3103413 commit 0356aad
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 0 deletions.
37 changes: 37 additions & 0 deletions chimerapy/engine/node/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,43 @@ def save_json(self, name: str, data: Dict[Any, Any]):
}
self.recorder.submit(json_entry)

def save_text(self, name: str, data: str, suffix="txt"):
"""Record text data from the node to a text file.
Parameters
----------
name : str
Name of the text file (.suffix extension will be suffixed).
data : str
The data to be recorded.
suffix : str
The suffix of the text file.
Notes
-----
It should be noted that new lines addition should be taken by the callee.
"""

if not self.recorder:
self.logger.warning(
f"{self}: cannot perform recording operation without RecorderService "
"initialization"
)
return False

if self.recorder.enabled:
text_entry = {
"uuid": uuid.uuid4(),
"name": name,
"data": data,
"suffix": suffix,
"dtype": "text",
"timestamp": datetime.datetime.now(),
}
self.recorder.submit(text_entry)

####################################################################
## Back-End Lifecycle API
####################################################################
Expand Down
4 changes: 4 additions & 0 deletions chimerapy/engine/node/record_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
TabularRecord,
ImageRecord,
JSONRecord,
TextRecord,
)
from ..service import Service

Expand Down Expand Up @@ -49,6 +50,7 @@ def __init__(
"tabular": TabularRecord,
"image": ImageRecord,
"json": JSONRecord,
"text": TextRecord,
}

# Making sure the attribute exists
Expand Down Expand Up @@ -128,6 +130,8 @@ def run(self):
if data_entry["name"] not in self.records:
entry_cls = self.record_map[data_entry["dtype"]]
entry = entry_cls(dir=self.state.logdir, name=data_entry["name"])

# FixMe: Potential overwrite of existing entry?
self.records[data_entry["name"]] = entry

# Case 2
Expand Down
2 changes: 2 additions & 0 deletions chimerapy/engine/records/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .tabular_record import TabularRecord
from .video_record import VideoRecord
from .json_record import JSONRecord
from .text_record import TextRecord

__all__ = [
"Record",
Expand All @@ -13,4 +14,5 @@
"TabularRecord",
"VideoRecord",
"JSONRecord",
"TextRecord",
]
46 changes: 46 additions & 0 deletions chimerapy/engine/records/text_record.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Built-in Imports
from typing import Dict, Any, Optional, IO
import pathlib

# Third-party Imports

# Internal Import
from .record import Record


class TextRecord(Record):
def __init__(
self,
dir: pathlib.Path,
name: str,
):
"""Construct a text file Record.
Args:
dir (pathlib.Path): The directory to store the snap shots of data.
name (str): The name of the ``Record``.
suffix (str): The suffix of the text file. Defaults to "txt".
"""
super().__init__()

# Saving the Record attributes
self.dir = dir
self.name = name
self.first_frame = False
self.file_handler: Optional[IO[str]] = None

def write(self, data_chunk: Dict[str, Any]):
if not self.first_frame:
self.file_handler = (self.dir / f"{self.name}.{data_chunk['suffix']}").open(
"w"
)
self.first_frame = True

text_data = data_chunk["data"]
assert self.file_handler is not None
self.file_handler.write(text_data)

def close(self):
if self.file_handler is not None:
self.file_handler.close()
self.file_handler = None
16 changes: 16 additions & 0 deletions test/streams/data_nodes.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Build-in Imports
import time
import random

# Third-party Imports
import pyaudio
Expand Down Expand Up @@ -75,3 +76,18 @@ def step(self):
time.sleep(1 / 10)
data = {"time": time.time(), "content": "HELLO"}
self.save_json(name="test", data=data)


class TextNode(cpe.Node):
def setup(self):
self.step_count = 0

def step(self):
time.sleep(1 / 10)
num_lines = random.randint(1, 5)
self.step_count += 1
lines = []
for j in range(num_lines):
lines.append(f"This is a test - Step Count - {self.step_count + 1}\n")

self.save_text(name="test", data="".join(lines), suffix="text")
102 changes: 102 additions & 0 deletions test/streams/test_text.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from .data_nodes import TextNode

# Built-in Imports
import os
import pathlib
import time
import uuid

# Third-party
import pytest

# Internal Imports
import chimerapy.engine as cpe
from chimerapy.engine.records.text_record import TextRecord
from chimerapy.engine.networking.async_loop_thread import AsyncLoopThread
from chimerapy.engine.eventbus import EventBus, Event

logger = cpe._logger.getLogger("chimerapy-engine")

# Constants
CWD = pathlib.Path(os.path.abspath(__file__)).parent.parent
TEST_DATA_DIR = CWD / "data"


@pytest.fixture
def text_node():

# Create a node
text_n = TextNode(name="text_n", logdir=TEST_DATA_DIR)

return text_n


def test_text_record():

# Check that the image was created
expected_text_path = TEST_DATA_DIR / "test-5.log"
try:
os.rmdir(expected_text_path.parent)
except OSError:
...

# Create the record
text_r = TextRecord(dir=TEST_DATA_DIR, name="test-5")

data = [
"Lorem ipsum dolor sit amet, consectetur adipiscing elit, "
"sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.\n",
"Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi "
"ut aliquip ex ea commodo consequat.\n",
]

# Write to image file
for i in range(5):
print("\n".join(data))
text_chunk = {
"uuid": uuid.uuid4(),
"name": "test-5",
"suffix": "log",
"data": "".join(data),
"dtype": "text",
}
text_r.write(text_chunk)

# Check that the image was created
assert expected_text_path.exists()

with expected_text_path.open("r") as jlf:
for idx, line in enumerate(jlf):
assert line.strip() == (data[idx % len(data)]).strip()


def test_node_save_text_stream(text_node):

# Event Loop
thread = AsyncLoopThread()
thread.start()
eventbus = EventBus(thread=thread)

# Check that the image was created
expected_text_path = pathlib.Path(text_node.state.logdir) / "test.text"
try:
os.rmdir(expected_text_path.parent)
except OSError:
...

# Stream
text_node.run(blocking=False, eventbus=eventbus)

# Wait to generate files
eventbus.send(Event("start")).result()
logger.debug("Finish start")
eventbus.send(Event("record")).result()
logger.debug("Finish record")
time.sleep(3)
eventbus.send(Event("stop")).result()
logger.debug("Finish stop")

text_node.shutdown()

# Check that the image was created
assert expected_text_path.exists()

0 comments on commit 0356aad

Please sign in to comment.