Skip to content

Commit

Permalink
Inital version of CSVStream
Browse files Browse the repository at this point in the history
  • Loading branch information
nuwangunasekara committed Apr 12, 2024
1 parent 7e1052d commit 0b3d35d
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 24 deletions.
6 changes: 4 additions & 2 deletions src/capymoa/stream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
Schema,
ARFFStream,
RandomTreeGenerator,
stream_from_file
stream_from_file,
CSVStream
)
from .PytorchStream import PytorchStream

Expand All @@ -13,5 +14,6 @@
"stream_from_file",
"ARFFStream",
"RandomTreeGenerator",
"PytorchStream"
"PytorchStream",
"CSVStream"
]
145 changes: 123 additions & 22 deletions src/capymoa/stream/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from typing import Dict, Optional, Sequence

import numpy as np
from numpy.lib import recfunctions as rfn

from com.yahoo.labs.samoa.instances import (
Attribute,
DenseInstance,
Expand Down Expand Up @@ -701,28 +703,7 @@ def stream_from_file(
# Delegate to the ARFFFileStream object within ARFFStream to actually read the file.
return ARFFStream(path=path_to_csv_or_arff, class_index=class_index)
elif path_to_csv_or_arff.endswith(".csv"):
# Do the file reading here.
_data = np.genfromtxt(
path_to_csv_or_arff, delimiter=",", skip_header=1
) # Assuming a header row

# Extract the feature data (all columns except the last one) and target data (last column)
# TODO: class_index logic should appear in here.
X = _data[:, :-1]
y = _data[:, -1]

# Extract the header from the CSV file (first row)
with open(path_to_csv_or_arff, "r") as file:
header = file.readline().strip().split(",")

return NumpyStream(
X=X,
y=y.astype(int),
dataset_name=dataset_name,
feature_names=header[:-1],
target_name=header[-1],
enforce_regression=enforce_regression,
)
return CSVStream(path_to_csv_or_arff, class_index=class_index)


def _numpy_to_ARFF(
Expand Down Expand Up @@ -870,3 +851,123 @@ def _add_instances_to_moa_stream(moa_stream, moa_header, X, y):
instance.setClassValue(y[instance_index]) # set class value

moa_stream.add(instance)

class CSVStream(Stream):
def __init__(self,
csv_file_path,
dtypes: list = None, # [('column1', np.float64), ('column2', np.int32), ('column3', np.float64), ('column3', str)] reads nomonal attributes as str
values_for_nominal_features={}, # {i: [1,2,3], k: [Aa, BB]}. Key is integer. Values are turned into strings
class_index: int = -1,
values_for_class_label: list = None,
target_attribute_name=None,
enforce_regression=False,
skip_header: bool = False,
delimiter=','):

self.csv_file_path = csv_file_path
self.values_for_nominal_features = values_for_nominal_features
self.class_index = class_index
self.values_for_class_label = values_for_class_label
self.target_attribute_name = target_attribute_name
self.enforce_regression = enforce_regression
self.skip_header = skip_header
self.delimiter = delimiter

self.dtypes = [] # [('column1', np.float64), ('column2', np.int32), ('column3', np.float64), ('column3', str)] reads nomonal attributes as str
if dtypes is None or len(dtypes) == 0: # data definition for each column not provided
if len(self.values_for_nominal_features) == 0: # data definition for nominal features are given
# need to infer number of columns, then generate full data definition using nominal information
# LOADS FIRST TWO ROWS INTO THE MEMORY
data = np.genfromtxt(self.csv_file_path, delimiter=self.delimiter, dtype=None, names=True,
skip_header=0, max_rows=2)
if not self.enforce_regression and self.values_for_class_label is None:
# LOADS THE FULL FILE INTO THE MEMORY
data = np.genfromtxt(self.csv_file_path, delimiter=self.delimiter, dtype=None, names=True,
skip_header=1 if skip_header else 0)
y = data[data.dtype.names[self.class_index]]
self.values_for_class_label = [str(value) for value in np.unique(y)]
for i, data_info in enumerate(data.dtype.descr):
column_name, data_type = data_info
if self.values_for_nominal_features.get(i) is not None: # i is in nominal feature keys
self.dtypes.append((column_name, 'str'))
else:
self.dtypes.append((column_name, data_type))
else: # need to infer data definitions
# LOADS THE FULL FILE INTO THE MEMORY
data = np.genfromtxt(self.csv_file_path, delimiter=self.delimiter, dtype=None, names=True,
skip_header=1 if skip_header else 0)
self.dtypes = data.dtype
if not self.enforce_regression and self.values_for_class_label is None:
y = data[data.dtype.names[self.class_index]]
self.values_for_class_label = [str(value) for value in np.unique(y)]
else: # data definition for each column are provided
self.dtypes = dtypes

self.total_number_of_lines = 0
if self.skip_header:
self.n_lines_to_skip = 1
else:
row1_data = np.genfromtxt(self.csv_file_path, delimiter=self.delimiter, dtype=None, names=True, skip_header=0,max_rows=1)
row2_data = np.genfromtxt(self.csv_file_path, delimiter=self.delimiter, dtype=None, names=True, skip_header=1, max_rows=1)
if row1_data.dtype.names != row2_data.dtype.names:
self.n_lines_to_skip = 1
else:
self.n_lines_to_skip = 0

self.__moa_stream_with_only_header, self.moa_header = _init_moa_stream_and_create_moa_header(
number_of_instances=1, # we only need this to initialize the MOA header
feature_names = [data_info[0] for data_info in self.dtypes],
values_for_nominal_features = self.values_for_nominal_features,
values_for_class_label = self.values_for_class_label,
dataset_name="CSVDataset",
target_attribute_name = self.target_attribute_name,
enforce_regression = self.enforce_regression,
)

self.schema = Schema(moa_header=self.moa_header)
super().__init__(schema=self.schema, CLI=None, moa_stream=None)
self.count_number_of_lines()

def count_number_of_lines(self):
with open(self.csv_file_path, "r") as file:
for line in file:
# Process each line here
self.total_number_of_lines += 1

def has_more_instances(self):
print(f'{self.total_number_of_lines} {self.n_lines_to_skip}')
return self.total_number_of_lines > self.n_lines_to_skip

def next_instance(self):
if not self.has_more_instances():
return None
# skip header
data = np.genfromtxt(self.csv_file_path, delimiter=self.delimiter, dtype=self.dtypes, names=None, skip_header=self.n_lines_to_skip, max_rows=1)
# np.genfromtxt() returns a structured https://numpy.org/doc/stable/user/basics.rec.html#structured-arrays
self.n_lines_to_skip += 1

# data = np.expand_dims(data, axis=0)
# y = data[[data.dtype.names[self.class_index]]].view('i4')
y = rfn.structured_to_unstructured(data[[data.dtype.names[self.class_index]]])[0]
# X = data[[item for item in data.dtype.names if item != data.dtype.names[self.class_index]]].view('f4')
X = rfn.structured_to_unstructured(data[[item for item in data.dtype.names if item != data.dtype.names[self.class_index]]])

if self.schema.is_classification():
return LabeledInstance.from_array(self.schema, X, y)
elif self.schema.is_regression():
return RegressionInstance.from_array(self.schema, X, y)
else:
raise ValueError(
"Unknown machine learning task must be a regression or "
"classification task"
)

def get_schema(self):
return self.schema

def get_moa_stream(self):
raise ValueError("Not a moa_stream, a numpy read file")

def restart(self):
self.total_number_of_lines = 0
self.n_lines_to_skip = 1 if self.skip_header else 0

0 comments on commit 0b3d35d

Please sign in to comment.