Skip to content

Commit

Permalink
feat(STREAM): change the logic when dealing with csv input
Browse files Browse the repository at this point in the history
  • Loading branch information
Spencer Sun authored and hmgomes committed May 24, 2024
1 parent 1c013f0 commit 2c2a89d
Showing 1 changed file with 70 additions and 57 deletions.
127 changes: 70 additions & 57 deletions src/capymoa/stream/_stream.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import typing
import warnings
from typing import Dict, Optional, Sequence

import numpy as np
Expand All @@ -21,7 +22,25 @@
RegressionInstance,
)


# Private functions
def _target_is_categorical(targets, target_type):
if target_type is None:
if type(targets[0]) == str or type(targets[0]) == bool:
return True
if type(targets[0]) == np.float64:
num_unique = len(np.unique(targets))
if num_unique >= 20:
warnings.warn(f'target variable includes {num_unique} (≥ 20) unique values, inferred as numeric, '
f'set target_type = \'categorical\' if you intend categorical targets')
return False
else:
warnings.warn(f'target variable includes {num_unique} (< 20) unique values, inferred as categorical, '
f'set target_type = \'numeric\' if you intend numeric targets')
return True
elif target_type != 'numeric' and target_type != 'categorical':
raise ValueError('target_type must be either numeric or categorical')
else:
return target_type == 'categorical'
class Schema:
"""Schema describes the structure of a stream.
Expand Down Expand Up @@ -131,7 +150,7 @@ def from_custom(
values_for_class_label: Sequence[str] = None,
dataset_name="No_Name",
target_attribute_name=None,
enforce_regression=False,
target_type=None,
):
"""Create a CapyMOA Schema that defines each attribute in the stream.
Expand Down Expand Up @@ -159,7 +178,7 @@ def from_custom(
... values_for_nominal_features={"attrib_1": ["a", "b"]},
... dataset_name="MyRegression",
... target_attribute_name="target",
... enforce_regression=True)
... enforce_numeric_target=True)
@relation MyRegression
<BLANKLINE>
@attribute attrib_1 {a,b}
Expand All @@ -178,8 +197,7 @@ def from_custom(
:param dataset_name: Name of the dataset. Default is "No_Name".
:param target_attribute_name: Name of the target/class attribute.
Default is None.
:param enforce_regression: If True, the schema is interpreted as a
regression problem. Default is False.
:param target_type: Set the target type as 'categorical' or 'numeric', None to detect automatically.
:return CayMOA Schema: Initialized CapyMOA Schema which contain all
necessary attribute information for all features and the class label
"""
Expand All @@ -189,7 +207,7 @@ def from_custom(
values_for_class_label=values_for_class_label,
dataset_name=dataset_name,
target_attribute_name=target_attribute_name,
enforce_regression=enforce_regression,
target_type=target_type,
)
return Schema(moa_header=moa_header)

Expand All @@ -205,7 +223,7 @@ def __str__(self):
class Stream:
"""A datastream that can be learnt instance by instance."""

# TODO: A problem in stream is that is has lots of conditional logic to
# TODO: A problem in stream is that it has lots of conditional logic to
# support a variety of ways to create a Stream object. This makes the code
# harder to understand and maintain. We should consider refactoring this
# with a abstract base class and subclasses for each type of stream.
Expand Down Expand Up @@ -298,22 +316,26 @@ def restart(self):
class ARFFStream(Stream):
"""A datastream originating from an ARFF file."""

def __init__(self, path: str, CLI: Optional[str] = None):
def __init__(
self,
path: str,
CLI: Optional[str] = None,
class_index: int = -1
):
"""Construct an ARFFStream object from a file path.
:param path: A filepath
:param CLI: Additional command line arguments to pass to the MOA stream.
"""
moa_stream = ArffFileStream(path, -1)
moa_stream = ArffFileStream(path, class_index)
super().__init__(moa_stream=moa_stream, CLI=CLI)


class NumpyStream(Stream):
"""A datastream originating from a numpy array."""

# This class is more complex than ARFFStream because it needs to read and convert the CSV to an ARFF in memory.
# enforce_regression overrides the default behavior of inferring whether the data represents a regression or classification task.
# TODO: class_index is currently ignored while reading the file in numpy_to_ARFF
# target_type to specify the target as 'categorical' or 'numeric', None for detecting automatically.

def __init__(
self,
Expand All @@ -322,7 +344,7 @@ def __init__(
dataset_name="No_Name",
feature_names=None,
target_name=None,
enforce_regression=False,
target_type: None | str = None, # numeric or categorical
):
"""Construct a NumpyStream object from a numpy array.
Expand All @@ -331,7 +353,7 @@ def __init__(
:param dataset_name: The name to give to the datastream, defaults to "No_Name"
:param feature_names: The names given to the features, defaults to None
:param target_name: The name given to target values, defaults to None
:param enforce_regression: Should it be used as regression, defaults to False
:param target_type: 'categorical' or 'numeric' target, defaults to None
"""
self.current_instance_index = 0

Expand All @@ -342,7 +364,7 @@ def __init__(
dataset_name,
feature_names=feature_names,
target_name=target_name,
enforce_regression=enforce_regression,
target_type=target_type,
)
)

Expand Down Expand Up @@ -394,7 +416,8 @@ def restart(self):
def stream_from_file(
path_to_csv_or_arff: str = None,
dataset_name: str = "NoName",
enforce_regression: bool = False,
class_index: int = -1,
target_type: None | str = None, # "numeric" or "categorical"
) -> Stream:
"""Create a datastream from a csv or arff file.
Expand All @@ -412,46 +435,43 @@ def stream_from_file(
:param path_to_csv_or_arff: A file path to a CSV or ARFF file.
:param dataset_name: A descriptive name given to the dataset, defaults to "NoName"
:param enforce_regression: When working with a CSV file, this parameter
allows the user to force the data to be interpreted as a regression
problem. Defaults to False.
:param target_type: When working with a CSV file, this parameter
allows the user to specify the target values in the data to be interpreted as a categorical or numeri.
Defaults to None to detect automatically.
"""
assert path_to_csv_or_arff is not None, "A file path must be provided."
if path_to_csv_or_arff.endswith(".arff"):
# Delegate to the ARFFFileStream object within ARFFStream to actually read the file.
return ARFFStream(path=path_to_csv_or_arff)
return ARFFStream(path=path_to_csv_or_arff, class_index=class_index)
elif path_to_csv_or_arff.endswith(".csv"):
# TODO: Upgrade to CSVStream once its faster and notebook tests don't fail
x_features = np.genfromtxt(path_to_csv_or_arff, delimiter=",", skip_header=1)
targets = x_features[:, -1]
targets = targets.astype(int)
x_features = x_features[:, :-1]
targets = x_features[:, class_index]
x_features = np.delete(x_features, class_index, axis=1)
# targets = targets.astype(type(targets[0]))
return NumpyStream(
x_features,
targets,
dataset_name=dataset_name,
enforce_regression=enforce_regression,
target_type=target_type,
)


def _numpy_to_ARFF(
X,
y,
dataset_name="No_Name",
feature_names=None,
target_name=None,
enforce_regression=False,
dataset_name: str ="No_Name",
feature_names: str =None,
target_name: str =None,
target_type: None | str = None,
):
"""Converts a numpy X and y into a ARFF format. The code infers whether it is a classification or regression problem
based on the y type. If y[0] is a double, then assumes regression (thus output will be numeric) otherwise assume
it as a classifiation problem. If the user desires to "force" regression, then set enforce_regression=True
"""Converts a numpy X and y into a ARFF format. The code first check if the user has specified the type of the
target values, if not, the code infers whether it is a categorical or numeric target by _target_is_categorical
method, i.e., if the unique values in the targets are more than 20, interpret as numeric, and vice versa.
"""
number_of_instances = X.shape[0]
enforce_regression = (
True if enforce_regression else np.issubdtype(type(y[0]), np.double)
)
class_labels = (
None if enforce_regression else [str(value) for value in np.unique(y)]
None if not _target_is_categorical(y, target_type) or target_type == 'numeric' else [str(value) for value in np.unique(y)]
)
feature_names = (
[f"attrib_{i}" for i in range(X.shape[1])]
Expand All @@ -464,7 +484,7 @@ def _numpy_to_ARFF(
values_for_class_label=class_labels,
dataset_name=dataset_name,
target_attribute_name=target_name,
enforce_regression=enforce_regression,
target_type=target_type,
)
_add_instances_to_moa_stream(moa_stream, moa_header, X, y)
return moa_stream, moa_header, class_labels
Expand All @@ -484,7 +504,7 @@ def _init_moa_stream_and_create_moa_header(
values_for_class_label: list = None,
dataset_name="No_Name",
target_attribute_name=None,
enforce_regression=False,
target_type: None|str =None, # 'categorical' or 'numeric'
):
"""Initialize a moa stream with number_of_instances capacity and create a mao header which contains all the necessary
attribute information.
Expand All @@ -498,7 +518,7 @@ def _init_moa_stream_and_create_moa_header(
:param values_for_class_label: possible values for class label. Values are turned into strings
:param dataset_name: name of the dataset. Defaults to "No_Name"
:param target_attribute_name: name for the target/class attribute
:param enforce_regression: If True assumes the problem as a regression problem
:param target_type: specifies the type of target as 'categorical' or 'numeric', None to detect automatically
:return moa_stream: initialized moa stream with capacity number_of_instances.
:return moa_header: initialized moa header which contain all necessary attribute information for all features and the class label
Expand All @@ -511,8 +531,6 @@ def _init_moa_stream_and_create_moa_header(
values_for_class_label = [str(value) for value in np.unique(y)]
enforce_regression = np.issubdtype(type(y[0]), np.double)
"""
attributes = FastVector()
# Attribute("name") will create a numeric attribute; Attribute("name", array_of_values) will create a nominal attribute
Expand All @@ -529,24 +547,19 @@ def _init_moa_stream_and_create_moa_header(
attribute = Attribute(name)
attributes.addElement(attribute)

if enforce_regression:
if target_type == 'numeric' or values_for_class_label is None:
if target_attribute_name is None:
attributes.addElement(Attribute("target"))
else:
attributes.addElement(Attribute(target_attribute_name))
else:
if values_for_class_label is None:
raise ValueError(
"values_for_class_label are None and enforce_regression is False. Looks like a regression problem?"
)
else:
class_attribute = _create_nominal_attribute(
attribute_name=(
"class" if target_attribute_name is None else target_attribute_name
),
possible_values=values_for_class_label,
)
attributes.addElement(class_attribute)
class_attribute = _create_nominal_attribute(
attribute_name=(
"class" if target_attribute_name is None else target_attribute_name
),
possible_values=values_for_class_label,
)
attributes.addElement(class_attribute)

moa_stream = Instances(dataset_name, attributes, number_of_instances)
# set last index for class index
Expand Down Expand Up @@ -582,7 +595,7 @@ def __init__(
class_index: int = -1,
values_for_class_label: list = None,
target_attribute_name=None,
enforce_regression=False,
target_type: None | str = None,
skip_header: bool = False,
delimiter=",",
):
Expand All @@ -591,7 +604,7 @@ def __init__(
self.class_index = class_index
self.values_for_class_label = values_for_class_label
self.target_attribute_name = target_attribute_name
self.enforce_regression = enforce_regression
self.target_type = target_type
self.skip_header = skip_header
self.delimiter = delimiter

Expand All @@ -614,7 +627,7 @@ def __init__(
skip_header=0,
max_rows=2,
)
if not self.enforce_regression and self.values_for_class_label is None:
if not self.target_type == 'numeric' and self.values_for_class_label is None:
# LOADS THE FULL FILE INTO THE MEMORY
data = np.genfromtxt(
self.csv_file_path,
Expand Down Expand Up @@ -643,7 +656,7 @@ def __init__(
skip_header=1 if skip_header else 0,
)
self.dtypes = data.dtype
if not self.enforce_regression and self.values_for_class_label is None:
if not self.target_type == 'numeric' and self.values_for_class_label is None:
y = data[data.dtype.names[self.class_index]]
self.values_for_class_label = [str(value) for value in np.unique(y)]
else: # data definition for each column are provided
Expand Down Expand Up @@ -682,7 +695,7 @@ def __init__(
values_for_class_label=self.values_for_class_label,
dataset_name="CSVDataset",
target_attribute_name=self.target_attribute_name,
enforce_regression=self.enforce_regression,
target_type=self.target_type,
)
)

Expand Down

0 comments on commit 2c2a89d

Please sign in to comment.