Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added sim processing: #14

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 13 additions & 36 deletions pytrnsys_process/converter.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,19 @@
import datetime as _dt
import pathlib as _pl
from dataclasses import dataclass
from enum import Enum

import pandas as _pd

from pytrnsys_process import file_matcher as fm
from pytrnsys_process import readers
from pytrnsys_process.logger import logger


@dataclass
class FilePattern:
patterns: list[str]
prefix: str


class FileType(Enum):
MONTHLY = FilePattern(patterns=["_mo_", "_mo", ".mo"], prefix="mo_")
HOURLY = FilePattern(patterns=["_hr_", "_hr", ".hr"], prefix="hr_")
TIMESTEP = FilePattern(patterns=["_step"], prefix="timestamp_")


class CsvConverter:

@staticmethod
def rename_file_with_prefix(file_path: _pl.Path, prefix: FileType) -> None:
def rename_file_with_prefix(
file_path: _pl.Path, prefix: fm.FileType
) -> None:
"""Rename a file with a given prefix.

Args:
Expand Down Expand Up @@ -68,32 +57,26 @@ def convert_sim_results_to_csv(
if not input_file.is_file():
continue

if self._has_pattern(
input_file.name, FileType.MONTHLY.value.patterns
):
if fm.has_pattern(input_file.name, fm.FileType.MONTHLY):
df = readers.PrtReader().read_monthly(input_file)
output_stem = self._refactor_filename(
input_file.stem,
FileType.MONTHLY.value.patterns,
FileType.MONTHLY.value.prefix,
fm.FileType.MONTHLY.value.patterns,
fm.FileType.MONTHLY.value.prefix,
)
elif self._has_pattern(
input_file.name, FileType.HOURLY.value.patterns
):
elif fm.has_pattern(input_file.name, fm.FileType.HOURLY):
df = readers.PrtReader().read_hourly(input_file)
output_stem = self._refactor_filename(
input_file.stem,
FileType.HOURLY.value.patterns,
FileType.HOURLY.value.prefix,
fm.FileType.HOURLY.value.patterns,
fm.FileType.HOURLY.value.prefix,
)
elif self._has_pattern(
input_file.name, FileType.TIMESTEP.value.patterns
):
elif fm.has_pattern(input_file.name, fm.FileType.TIMESTEP):
df = readers.PrtReader().read_hourly(input_file)
output_stem = self._refactor_filename(
input_file.stem,
FileType.TIMESTEP.value.patterns,
FileType.TIMESTEP.value.prefix,
fm.FileType.TIMESTEP.value.patterns,
fm.FileType.TIMESTEP.value.prefix,
)
else:
logger.warning(
Expand Down Expand Up @@ -142,9 +125,3 @@ def _refactor_filename(
for pattern in patterns:
processed_name = processed_name.replace(pattern, "")
return f"{prefix}{processed_name}"

@staticmethod
def _has_pattern(filename: str, patterns: list[str]) -> bool:
"""Check if filename matches any of the given patterns."""
filename = filename.lower()
return any(pattern in filename for pattern in patterns)
86 changes: 86 additions & 0 deletions pytrnsys_process/file_matcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import datetime as _dt
import pathlib as _pl
from dataclasses import dataclass
from enum import Enum

from pytrnsys_process import readers
from pytrnsys_process.logger import logger


@dataclass
class FilePattern:
patterns: list[str]
prefix: str


class FileType(Enum):
MONTHLY = FilePattern(patterns=["_mo_", "_mo", ".mo", "mo_"], prefix="mo_")
HOURLY = FilePattern(patterns=["_hr_", "_hr", ".hr", "hr_"], prefix="hr_")
TIMESTEP = FilePattern(patterns=["_step", "step_"], prefix="step_")


def get_file_type_using_file_content(file_path: _pl.Path) -> FileType:
"""
Determine the file type by analyzing its content.

Args:
file_path (Path): Path to the file to analyze

Returns:
FileType: The detected file type (MONTHLY, HOURLY, or TIMESTEP)
"""
reader = readers.PrtReader()

# First try reading as regular file to check if it's monthly
df = reader.read(file_path)
if df.columns[0] == "Month":
logger.info("Detected %s as monthly file", file_path)
return FileType.MONTHLY

# If not monthly, read as step and check time interval
df_step_or_hourly = reader.read_step(file_path)
time_interval = df_step_or_hourly.index[1] - df_step_or_hourly.index[0]

if time_interval < _dt.timedelta(hours=1):
logger.info("Detected %s as step file", file_path)
return FileType.TIMESTEP

logger.info("Detected %s as hourly file", file_path)
return FileType.HOURLY


def get_file_type_using_file_name(file_name: str) -> FileType:
"""
Determine the file type by checking the filename against known patterns.

Args:
file_name (str): The name of the file to check

Returns:
FileType: The detected file type (MONTHLY, HOURLY, or TIMESTEP)

Raises:
ValueError: If no matching pattern is found
"""
file_name = file_name.lower()

for file_type in FileType:
if any(pattern in file_name for pattern in file_type.value.patterns):
return file_type

raise ValueError(f"No matching file type found for filename: {file_name}")


def has_pattern(file_name: str, file_type: FileType) -> bool:
"""
Check if a filename contains any of the patterns associated with a specific FileType.

Args:
file_name (str): The name of the file to check
file_type (FileType): The FileType enum containing patterns to match against

Returns:
bool: True if the filename contains any of the patterns, False otherwise
"""
file_name = file_name.lower()
return any(pattern in file_name for pattern in file_type.value.patterns)
30 changes: 9 additions & 21 deletions pytrnsys_process/headers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import pathlib as _pl
import typing as _tp
from abc import ABC
from collections import defaultdict as _defaultdict
from concurrent.futures import ProcessPoolExecutor

from pytrnsys_process import utils
from pytrnsys_process.readers import HeaderReader


Expand All @@ -19,15 +19,17 @@ def _process_sim_file(sim_file):
class Headers:

RESULTS_FOLDER_NAME = "temp"

header_index: _defaultdict[_tp.Any, list]
# TODO adjust type # pylint: disable=fixme
header_index: _defaultdict[str, list]

def __init__(self, path_to_results: _pl.Path):
self.path_to_results = path_to_results
self.header_index = _defaultdict(list)

def init_headers(self):
sim_files = self._get_files(self._get_sim_folders())
sim_files = utils.get_files(
utils.get_sim_folders(self.path_to_results)
)
for sim_file in sim_files:
try:
headers = HeaderReader().read_headers(sim_file)
Expand All @@ -37,7 +39,9 @@ def init_headers(self):
print(f"Could not read {sim_file}: {e}")

def init_headers_multi_process(self):
sim_files = self._get_files(self._get_sim_folders())
sim_files = utils.get_files(
utils.get_sim_folders(self.path_to_results)
)

with ProcessPoolExecutor() as executor:
results = executor.map(_process_sim_file, sim_files)
Expand All @@ -61,22 +65,6 @@ def _index_headers(
for header in headers:
self.header_index[header].append((sim_folder.name, sim_file.name))

def _get_sim_folders(self) -> list[_pl.Path]:
sim_folders = []
for item in self.path_to_results.glob("*"):
if item.is_dir():
sim_folders.append(item)
return sim_folders

def _get_files(self, sim_folders: list[_pl.Path]) -> list[_pl.Path]:
sim_files = []
for sim_folder in sim_folders:
for sim_file in (sim_folder / self.RESULTS_FOLDER_NAME).glob(
"**/*"
):
sim_files.append(sim_file)
return sim_files


class HeaderValidationMixin(ABC):
def validate_headers(
Expand Down
18 changes: 13 additions & 5 deletions pytrnsys_process/plotters.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,16 @@ def configure(self, ax: _plt.Axes) -> _plt.Axes:
_plt.tight_layout()
return ax

# TODO: Test validation # pylint: disable=fixme
def plot(
self,
df: _pd.DataFrame,
columns: list[str],
**kwargs,
) -> _plt.Figure:
return self._do_plot(df, columns, **kwargs)

# TODO: Test validation # pylint: disable=fixme
def plot_with_column_validation(
self,
df: _pd.DataFrame,
columns: list[str],
Expand Down Expand Up @@ -110,16 +118,16 @@ def _do_plot(
"""The matplot date formatter does not work when using df.plot func.
This is an example to plot a stacked bar chart without df.plot"""
fig, ax = _plt.subplots(figsize=size)
x = _np.arange(len(df.index))
x = df.index
bottom = _np.zeros(len(df.index))
for col in columns:
ax.bar(x, df[col], label=col, bottom=bottom, width=0.35)
bottom += df[col]
if use_legend:
ax.legend()
ax.set_xticklabels(
_pd.to_datetime(df.index).strftime(self.DATE_FORMAT)
)
# ax.set_xticklabels(
# _pd.to_datetime(df.index).strftime(self.DATE_FORMAT)
# )
self.configure(ax)
return fig

Expand Down
Empty file.
64 changes: 64 additions & 0 deletions pytrnsys_process/process_sim/process_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import pathlib as _pl
from dataclasses import dataclass

import pandas as _pd

from pytrnsys_process import file_matcher as fm
from pytrnsys_process import readers, utils


@dataclass
class SimFile:
name: str
type: fm.FileType
data: _pd.DataFrame


@dataclass
class Simulation:
name: str
files: list[SimFile]


def process_simulation(
sim_folder: _pl.Path, detect_file_using_content: bool = False
) -> Simulation:
sim_files = utils.get_files([sim_folder])
files = []
for sim_file in sim_files:
if detect_file_using_content:
files.append(process_file_using_file_content(sim_file))
else:
files.append(process_file_using_file_name(sim_file))

return Simulation(sim_folder.name, files)


def process_file_using_file_content(file_path: _pl.Path) -> SimFile:
file_type = fm.get_file_type_using_file_content(file_path)
reader = readers.PrtReader()
if file_type == fm.FileType.MONTHLY:
data = reader.read_monthly(file_path)
elif file_type == fm.FileType.HOURLY:
data = reader.read_hourly(file_path)
elif file_type == fm.FileType.TIMESTEP:
data = reader.read_step(file_path)
else:
raise ValueError(f"Unknown file type: {file_type}")

return SimFile(file_path.name, file_type, data)


def process_file_using_file_name(file_path: _pl.Path) -> SimFile:
file_type = fm.get_file_type_using_file_name(file_path.name)
reader = readers.PrtReader()
if file_type == fm.FileType.MONTHLY:
data = reader.read_monthly(file_path)
elif file_type == fm.FileType.HOURLY:
data = reader.read_hourly(file_path)
elif file_type == fm.FileType.TIMESTEP:
data = reader.read_step(file_path)
else:
raise ValueError(f"Unknown file type: {file_type}")

return SimFile(file_path.name, file_type, data)
Loading