Skip to content
This repository has been archived by the owner on Sep 1, 2023. It is now read-only.

Documented nupic.data #3593

Merged
merged 16 commits into from
May 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,12 @@ nupic
│   ├── temporal_memory.py [TODO]
│   └── temporal_memory_shim.py [TODO]
├── data
│   ├── aggregator.py [DEFER]
│   ├── dictutils.py [DEFER]
│   ├── fieldmeta.py [OK]
│   ├── file_record_stream.py [OK]
│   ├── inference_shifter.py [OK]
│   ├── joiner.py [TODO]
│   ├── jsonhelpers.py [TODO]
│   ├── record_stream.py [TODO]
│   ├── sorter.py [TODO]
│   ├── stats.py [TODO]
│   ├── stats_v2.py [TODO]
│   ├── stream_reader.py [TODO]
│   └── utils.py [TODO]
│   ├── record_stream.py [OK]
│   ├── stream_reader.py [OK]
│   └── utils.py [OK]
├── encoders
│   ├── adaptivescalar.py [OK]
│   ├── base.py [OK]
Expand Down
8 changes: 8 additions & 0 deletions docs/source/api/data/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,11 @@ SensorInput
.. autoclass:: nupic.frameworks.opf.opf_utils.SensorInput
:members:

Utilities
^^^^^^^^^

.. automodule:: nupic.data.utils
:members:

.. autodata:: nupic.data.utils.DATETIME_FORMATS
:annotation:
4 changes: 4 additions & 0 deletions docs/source/api/opf/utils.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,7 @@ Helpers

.. automodule:: nupic.frameworks.opf.opf_helpers
:members:

.. autoclass:: nupic.frameworks.opf.opf_utils.InferenceType
:members:
:show-inheritance:
2 changes: 1 addition & 1 deletion src/nupic/data/record_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ def getAggregationMonthsAndSeconds(self):
If there is no aggregation associated with the stream, returns None.

Typically, a raw file or hbase stream will NOT have any aggregation info,
but subclasses of RecordStreamIFace, like :class:`~nupic.data.StreamReader`,
but subclasses of RecordStreamIface, like :class:`~nupic.data.StreamReader`,
will and will return the aggregation period from this call. This call is
used by the :meth:`getNextRecordDict` method to assign a record number to a
record given its timestamp and the aggregation interval
Expand Down
199 changes: 106 additions & 93 deletions src/nupic/data/stream_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,86 +50,88 @@ class StreamTimeoutException(Exception):
class StreamReader(RecordStreamIface):
"""
Implements a stream reader. This is a high level class that owns one or more
underlying implementations of a RecordStreamIFace. Each RecordStreamIFace
implements the raw reading of records from the record store (which could be a
file, hbase table or something else).
underlying implementations of a
:class:`~nupic.data.record_stream.RecordStreamIface`. Each
:class:`~nupic.data.record_stream.RecordStreamIface` implements the raw
reading of records from the record store (which could be a file, hbase table
or something else).

In the future, we will support joining of two or more RecordStreamIface's (
which is why the streamDef accepts a list of 'stream' elements), but for now
only 1 source is supported.
In the future, we will support joining of two or more
:class:`~nupic.data.record_stream.RecordStreamIface`'s (which is why the
``streamDef`` accepts a list of 'stream' elements), but for now only 1 source
is supported.

The class also implements aggregation of the (in the future) joined records
from the sources.

This module parses the stream definition (as defined in
/nupic/frameworks/opf/jsonschema/stream_def.json), creates the
RecordStreamIFace for each source ('stream's element) defined in the stream
def, performs aggregation, and returns each record in the correct format
according to the desired column names specified in the streamDef.
``/src/nupic/frameworks/opf/jsonschema/stream_def.json``), creates the
:class:`~nupic.data.record_stream.RecordStreamIface` for each source
('stream' element) defined in the stream def, performs aggregation, and
returns each record in the correct format according to the desired column
names specified in the streamDef.

This class implements the RecordStreamIFace interface and thus can be used
in place of a raw record stream.
This class implements the :class:`~nupic.data.record_stream.RecordStreamIface`
interface and thus can be used in place of a raw record stream.

This is an example streamDef:
{
'version': 1
'info': 'test_hotgym',

'streams': [
{'columns': [u'*'],
'info': u'hotGym.csv',
'last_record': 4000,
'source': u'file://extra/hotgym/hotgym.csv'}.
],

'timeField': 'timestamp',

'aggregation': {
'hours': 1,
'fields': [
('timestamp', 'first'),
('gym', 'first'),
('consumption', 'sum')
],
}

}

"""
.. code-block:: python

{
'version': 1
'info': 'test_hotgym',

def __init__(self, streamDef, bookmark=None, saveOutput=False,
isBlocking=True, maxTimeout=0, eofOnTimeout=False):
""" Base class constructor, performs common initialization
'streams': [
{'columns': [u'*'],
'info': u'hotGym.csv',
'last_record': 4000,
'source': u'file://extra/hotgym/hotgym.csv'}.
],

Parameters:
----------------------------------------------------------------
streamDef: The stream definition, potentially containing multiple sources
(not supported yet). See
/nupic/frameworks/opf/jsonschema/stream_def.json for the format
of this dict
'timeField': 'timestamp',

bookmark: Bookmark to start reading from. This overrides the first_record
field of the streamDef if provided.
'aggregation': {
'hours': 1,
'fields': [
('timestamp', 'first'),
('gym', 'first'),
('consumption', 'sum')
],
}

saveOutput: If true, save the output to a csv file in a temp directory.
The path to the generated file can be found in the log
output.
}

isBlocking: should read operation block *forever* if the next row of data
is not available, but the stream is not marked as 'completed'
yet?

maxTimeout: if isBlocking is False, max seconds to wait for more data before
timing out; ignored when isBlocking is True.
:param streamDef: The stream definition, potentially containing multiple
sources (not supported yet). See
``src//nupic/frameworks/opf/jsonschema/stream_def.json`` for the
format of this dict

eofOnTimeout: If True and we get a read timeout (isBlocking must be False
to get read timeouts), assume we've reached the end of the
input and produce the last aggregated record, if one can be
completed.
:param bookmark: Bookmark to start reading from. This overrides the
first_record field of the streamDef if provided.

:param saveOutput: If true, save the output to a csv file in a temp
directory. The path to the generated file can be found in the log
output.

:param isBlocking: should read operation block *forever* if the next row of
data is not available, but the stream is not marked as 'completed'
yet?

:param maxTimeout: if isBlocking is False, max seconds to wait for more data
before timing out; ignored when isBlocking is True.

:param eofOnTimeout: If True and we get a read timeout (isBlocking must be
False to get read timeouts), assume we've reached the end of the
input and produce the last aggregated record, if one can be
completed.

"""

"""

def __init__(self, streamDef, bookmark=None, saveOutput=False,
isBlocking=True, maxTimeout=0, eofOnTimeout=False):
# Call superclass constructor
super(StreamReader, self).__init__()

Expand Down Expand Up @@ -304,7 +306,8 @@ def close(self):

def getNextRecord(self):
""" Returns combined data from all sources (values only).
Returns None on EOF; empty sequence on timeout.

:returns: None on EOF; empty sequence on timeout.
"""


Expand Down Expand Up @@ -370,7 +373,8 @@ def getNextRecord(self):


def getDataRowCount(self):
"""Iterates through stream to calculate total records after aggregation.
"""
Iterates through stream to calculate total records after aggregation.
This will alter the bookmark state.
"""
inputRowCountAfterAggregation = 0
Expand All @@ -385,14 +389,17 @@ def getDataRowCount(self):


def getNextRecordIdx(self):
"""Returns the index of the record that will be read next from
getNextRecord()
"""
:returns: the index of the record that will be read next from
:meth:`getNextRecord`.
"""
return self._recordCount


def recordsExistAfter(self, bookmark):
"""Returns True iff there are records left after the bookmark."""
"""
:returns: True if there are records left after the bookmark.
"""
return self._recordStore.recordsExistAfter(bookmark)


Expand All @@ -402,56 +409,52 @@ def getAggregationMonthsAndSeconds(self):
seconds is a floating point. Only one is allowed to be non-zero at a
time.

If there is no aggregation associated with the stream, returns None.
Will return the aggregation period from this call. This call is
used by the :meth:`nupic.data.record_stream.RecordStream.getNextRecordDict`
method to assign a record number to a record given its timestamp and the
aggregation interval.

Typically, a raw file or hbase stream will NOT have any aggregation info,
but subclasses of RecordStreamIFace, like StreamReader, will and will
return the aggregation period from this call. This call is used by the
getNextRecordDict() method to assign a record number to a record given
its timestamp and the aggregation interval
:returns: aggregationPeriod (as a dict) where:

Parameters:
------------------------------------------------------------------------
retval: aggregationPeriod (as a dict) or None
'months': number of months in aggregation period
'seconds': number of seconds in aggregation period (as a float)
- ``months``: number of months in aggregation period
- ``seconds``: number of seconds in aggregation period
(as a float)
"""
return self._aggMonthsAndSeconds


def appendRecord(self, record, inputRef=None):
"""Saves the record in the underlying storage."""
raise RuntimeError("Not implemented in StreamReader")


def appendRecords(self, records, inputRef=None, progressCB=None):
"""Saves multiple records in the underlying storage."""
raise RuntimeError("Not implemented in StreamReader")


def seekFromEnd(self, numRecords):
"""Seeks to numRecords from the end and returns a bookmark to the new
position.
"""
raise RuntimeError("Not implemented in StreamReader")


def getFieldNames(self):
""" Returns all fields in all inputs (list of plain names).
NOTE: currently, only one input is supported
"""
Returns all fields in all inputs (list of plain names).

.. note:: currently, only one input is supported
"""
return [f.name for f in self._streamFields]


def getFields(self):
""" Returns a sequence of nupic.data.fieldmeta.FieldMetaInfo
name/type/special tuples for each field in the stream.
"""
:returns: a sequence of :class:`nupic.data.fieldmeta.FieldMetaInfo` for each
field in the stream.
"""
return self._streamFields


def getBookmark(self):
""" Returns a bookmark to the current position
"""
:returns: a bookmark to the current position
"""
return self._aggBookmark

Expand All @@ -463,10 +466,11 @@ def clearStats(self):


def getStats(self):
""" Returns stats (like min and max values of the fields).

"""
TODO: This method needs to be enhanced to get the stats on the *aggregated*
records.

:returns: stats (like min and max values of the fields).
"""

# The record store returns a dict of stats, each value in this dict is
Expand All @@ -490,37 +494,46 @@ def getStats(self):


def getError(self):
""" Returns errors saved in the stream.
"""
:returns: errors saved in the stream.
"""
return self._recordStore.getError()


def setError(self, error):
""" Saves specified error in the stream.

:param error: to save
"""
self._recordStore.setError(error)


def isCompleted(self):
""" Returns True if all records have been read.
"""
:returns: True if all records have been read.
"""
return self._recordStore.isCompleted()


def setCompleted(self, completed=True):
""" Marks the stream completed (True or False)
"""
Marks the stream completed (True or False)

:param completed: (bool) is completed or not
"""
# CSV file is always considered completed, nothing to do
self._recordStore.setCompleted(completed)


def setTimeout(self, timeout):
""" Set the read timeout """
"""Set the read timeout.

:param timeout: (float or int) timeout length
"""
self._recordStore.setTimeout(timeout)


def flush(self):
""" Flush the file to disk """
raise RuntimeError("Not implemented in StreamReader")


Loading