diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1dcc3d4 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +build/ +dist/ +pyEDF.egg-info/ +*.pyc +*.*~ diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..92bf6b8 --- /dev/null +++ b/README.rst @@ -0,0 +1,27 @@ +pyEDF +===== + +Python package to read from and write EEG data to European Data Format +files. It is implemented in pure Python with very limited dependencies +on external packages. + +See also +-------- + +- https://github.com/holgern/pyedflib +- https://github.com/MNE-tools/MNE-python + +Assuming a standard Python environment is installed on your machine +(including pip), pyEDF can be installed from PyPI: + +:: + + python -m pip install --user --upgrade pyEDF + +For the current pre-release version, you can install from GitHub: + +:: + + python -m pip install --user --upgrade git+https://github.com/robertoostenveld/pyEDF.git + +Dependencies should be handled automatically by pip. diff --git a/EDF.py b/pyEDF/EDF.py similarity index 50% rename from EDF.py rename to pyEDF/EDF.py index 7f08e02..a6ba62f 100644 --- a/EDF.py +++ b/pyEDF/EDF.py @@ -1,35 +1,73 @@ +#! /usr/bin/env python +''' +Copyright (C) 2016-2018 Robert Ooostenveld + 2018 Phillip Alday + +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. +* Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +''' + +# TODO: Add division after writing basic unit tests to discover issues the +# changed behaviour may cause. +from __future__ import print_function + from copy import deepcopy -from math import ceil, floor from struct import pack, unpack -import calendar -import datetime import numpy as np import os +import os.path as op import re -import warnings +from warnings import warn + def padtrim(buf, num): num -= len(buf) - if num>=0: + # for Python 3 compatibility, we have to explicitly convert str to bytes + if num >= 0: # pad the input to the specified length - return str(buf) + ' ' * num + return str(buf).encode() + b' ' * num else: # trim the input to the specified length - return buf[0:num] + return (buf[0:num]).encode() -#################################################################################################### -# the EDF header is represented as a tuple of (meas_info, chan_info) -# meas_info should have ['record_length', 'magic', 'hour', 'subject_id', 'recording_id', 'n_records', 'month', 'subtype', 'second', 'nchan', 'data_size', 'data_offset', 'lowpass', 'year', 'highpass', 'day', 'minute'] -# chan_info should have ['physical_min', 'transducers', 'physical_max', 'digital_max', 'ch_names', 'n_samps', 'units', 'digital_min'] -#################################################################################################### +############################################################################## +# the EDF header is represented as a tuple of (meas_info, chan_info) +# The fields in meas_info are ['record_length', 'magic', 'hour', 'subject_id', +# 'recording_id', 'n_records', 'month', 'subtype', 'second', 'nchan', +# 'data_size', 'data_offset', 'lowpass', 'year', 'highpass', 'day', 'minute'] +# The fields in chan_info are ['physical_min', 'transducers', 'physical_max', +# 'digital_max', 'ch_names', 'n_samps', 'units', 'digital_min'] +############################################################################## class EDFWriter(): def __init__(self, fname=None): self.fname = None self.meas_info = None self.chan_info = None self.calibrate = None - self.offset = None + self.offset = None self.n_records = 0 if fname: self.open(fname) @@ -52,18 +90,20 @@ def close(self): with open(self.fname, 'wb') as fid2: assert(fid2.tell() == 0) fid2.write(fid1.read(236)) - fid1.read(8) # skip this part - fid2.write(padtrim(str(self.n_records), 8)) # but write this instead + # skip this bit + fid1.read(8) + # but write this instead + fid2.write(padtrim(str(self.n_records), 8)) fid2.write(fid1.read(meas_info['data_offset'] - 236 - 8)) - blocksize = np.sum(chan_info['n_samps']) * meas_info['data_size'] + bsize = np.sum(chan_info['n_samps']) * meas_info['data_size'] for block in range(self.n_records): - fid2.write(fid1.read(blocksize)) + fid2.write(fid1.read(int(bsize))) os.remove(tempname) self.fname = None self.meas_info = None self.chan_info = None self.calibrate = None - self.offset = None + self.offset = None self.n_records = 0 return @@ -76,18 +116,24 @@ def writeHeader(self, header): assert(fid.tell() == 0) # fill in the missing or incomplete information - if not 'subject_id' in meas_info: + if 'subject_id' not in meas_info: meas_info['subject_id'] = '' - if not 'recording_id' in meas_info: + + if 'recording_id' not in meas_info: meas_info['recording_id'] = '' - if not 'subtype' in meas_info: + + if 'subtype' not in meas_info: meas_info['subtype'] = 'edf' nchan = meas_info['nchan'] - if not 'ch_names' in chan_info or len(chan_info['ch_names'])2d}.{:0>2d}.{:0>2d}'.format(meas_info['day'], meas_info['month'], meas_info['year']), 8)) - fid.write(padtrim('{:0>2d}.{:0>2d}.{:0>2d}'.format(meas_info['hour'], meas_info['minute'], meas_info['second']), 8)) + dmy = '{:0>2d}.{:0>2d}.{:0>2d}'.format(meas_info['day'], + meas_info['month'], + meas_info['year']) + fid.write(padtrim(dmy, 8)) + hms = '{:0>2d}.{:0>2d}.{:0>2d}'.format(meas_info['hour'], + meas_info['minute'], + meas_info['second']) + fid.write(padtrim(hms, 8)) fid.write(padtrim(str(meas_size + chan_size), 8)) - fid.write(' ' * 44) - fid.write(padtrim(str(-1), 8)) # the final n_records should be inserted on byte 236 + fid.write(padtrim(meas_info['subtype'], 44)) + # the final n_records should be inserted on byte 236 + fid.write(padtrim(str(-1), 8)) fid.write(padtrim(str(meas_info['record_length']), 8)) fid.write(padtrim(str(meas_info['nchan']), 4)) # ensure that these are all np arrays rather than lists - for key in ['physical_min', 'transducers', 'physical_max', 'digital_max', 'ch_names', 'n_samps', 'units', 'digital_min']: + for key in ['physical_min', 'transducers', 'physical_max', + 'digital_max', 'ch_names', 'n_samps', 'units', + 'digital_min']: chan_info[key] = np.asarray(chan_info[key]) for i in range(meas_info['nchan']): - fid.write(padtrim( chan_info['ch_names'][i], 16)) + fid.write(padtrim(chan_info['ch_names'][i], 16)) for i in range(meas_info['nchan']): - fid.write(padtrim( chan_info['transducers'][i], 80)) + fid.write(padtrim(chan_info['transducers'][i], 80)) for i in range(meas_info['nchan']): - fid.write(padtrim( chan_info['units'][i], 8)) + fid.write(padtrim(chan_info['units'][i], 8)) for i in range(meas_info['nchan']): fid.write(padtrim(str(chan_info['physical_min'][i]), 8)) for i in range(meas_info['nchan']): @@ -125,38 +180,43 @@ def writeHeader(self, header): for i in range(meas_info['nchan']): fid.write(padtrim(str(chan_info['digital_max'][i]), 8)) for i in range(meas_info['nchan']): - fid.write(' ' * 80) # prefiltering + fid.write(b' ' * 80) # prefiltering for i in range(meas_info['nchan']): fid.write(padtrim(str(chan_info['n_samps'][i]), 8)) for i in range(meas_info['nchan']): - fid.write(' ' * 32) # reserved + fid.write(b' ' * 32) # reserved meas_info['data_offset'] = fid.tell() self.meas_info = meas_info self.chan_info = chan_info - self.calibrate = (chan_info['physical_max'] - chan_info['physical_min'])/(chan_info['digital_max'] - chan_info['digital_min']); - self.offset = chan_info['physical_min'] - self.calibrate * chan_info['digital_min']; - channels = list(range(meas_info['nchan'])) - for ch in channels: - if self.calibrate[ch]<0: - self.calibrate[ch] = 1; - self.offset[ch] = 0; + self.calibrate = chan_info['physical_max'] - chan_info['physical_min'] + self.calibrate /= (chan_info['digital_max'] - chan_info['digital_min']) + self.offset = chan_info['physical_min'] + self.offset -= self.calibrate * chan_info['digital_min'] + + for ch in range(meas_info['nchan']): + if self.calibrate[ch] < 0: + self.calibrate[ch] = 1 + self.offset[ch] = 0 def writeBlock(self, data): meas_info = self.meas_info chan_info = self.chan_info + with open(self.fname, 'ab') as fid: - assert(fid.tell() > 0) + assert fid.tell() > 0 + for i in range(meas_info['nchan']): raw = deepcopy(data[i]) - assert(len(raw)==chan_info['n_samps'][i]) - if min(raw)chan_info['physical_max'][i]: - warnings.warn('Value exceeds physical_max: '+ str(max(raw))); + assert len(raw) == chan_info['n_samps'][i] + if min(raw) < chan_info['physical_min'][i]: + warn('Value exceeds physical_min: {}'.format(min(raw))) + if max(raw) > chan_info['physical_max'][i]: + warn('Value exceeds physical_max: {}'.format(max(raw))) - raw -= self.offset[i] # FIXME I am not sure about the order of calibrate and offset + # FIXME I am not sure about the order of calibrate and offset + raw -= self.offset[i] raw /= self.calibrate[i] raw = np.asarray(raw, dtype=np.int16) @@ -165,7 +225,6 @@ def writeBlock(self, data): fid.write(val) self.n_records += 1 -#################################################################################################### class EDFReader(): def __init__(self, fname=None): @@ -173,7 +232,7 @@ def __init__(self, fname=None): self.meas_info = None self.chan_info = None self.calibrate = None - self.offset = None + self.offset = None if fname: self.open(fname) @@ -189,69 +248,79 @@ def close(self): self.meas_info = None self.chan_info = None self.calibrate = None - self.offset = None + self.offset = None def readHeader(self): - # the following is copied over from MNE-Python and subsequently modified - # to more closely reflect the native EDF standard + # the following is copied over from MNE-Python and subsequently + # modified to more closely reflect the native EDF standard meas_info = {} chan_info = {} with open(self.fname, 'rb') as fid: - assert(fid.tell() == 0) + assert fid.tell() == 0 - meas_info['magic'] = fid.read(8).strip().decode() - meas_info['subject_id'] = fid.read(80).strip().decode() # subject id - meas_info['recording_id'] = fid.read(80).strip().decode() # recording id + meas_info['magic'] = fid.read(8).strip().decode() + meas_info['subject_id'] = fid.read(80).strip().decode() + meas_info['recording_id'] = fid.read(80).strip().decode() - day, month, year = [int(x) for x in re.findall('(\d+)', fid.read(8).decode())] - hour, minute, second = [int(x) for x in re.findall('(\d+)', fid.read(8).decode())] + day, month, year = [int(x) for x in + re.findall('(\d+)', fid.read(8).decode())] + hour, minute, second = [int(x) for x in + re.findall('(\d+)', fid.read(8).decode())] meas_info['day'] = day meas_info['month'] = month meas_info['year'] = year meas_info['hour'] = hour meas_info['minute'] = minute meas_info['second'] = second - # date = datetime.datetime(year + 2000, month, day, hour, minute, sec) - # meas_info['meas_date'] = calendar.timegm(date.utctimetuple()) - meas_info['data_offset'] = header_nbytes = int(fid.read(8).decode()) + meas_info['data_offset'] = header_nbytes = int(fid.read(8).decode()) # noqa:E501 subtype = fid.read(44).strip().decode()[:5] if len(subtype) > 0: meas_info['subtype'] = subtype else: - meas_info['subtype'] = os.path.splitext(self.fname)[1][1:].lower() + meas_info['subtype'] = op.splitext(self.fname)[1][1:].lower() if meas_info['subtype'] in ('24BIT', 'bdf'): meas_info['data_size'] = 3 # 24-bit (3 byte) integers else: meas_info['data_size'] = 2 # 16-bit (2 byte) integers - meas_info['n_records'] = n_records = int(fid.read(8).decode()) + meas_info['n_records'] = int(fid.read(8).decode()) # record length in seconds record_length = float(fid.read(8).decode()) if record_length == 0: meas_info['record_length'] = record_length = 1. - warnings.warn('Headermeas_information is incorrect for record length. ' - 'Default record length set to 1.') + warn('Headermeas_information is incorrect for record length. ' + 'Default record length set to 1.') else: meas_info['record_length'] = record_length meas_info['nchan'] = nchan = int(fid.read(4).decode()) - channels = list(range(nchan)) - chan_info['ch_names'] = [fid.read(16).strip().decode() for ch in channels] - chan_info['transducers'] = [fid.read(80).strip().decode() for ch in channels] - chan_info['units'] = [fid.read(8).strip().decode() for ch in channels] - chan_info['physical_min'] = physical_min = np.array([float(fid.read(8).decode()) for ch in channels]) - chan_info['physical_max'] = physical_max = np.array([float(fid.read(8).decode()) for ch in channels]) - chan_info['digital_min'] = digital_min = np.array([float(fid.read(8).decode()) for ch in channels]) - chan_info['digital_max'] = digital_max = np.array([float(fid.read(8).decode()) for ch in channels]) - - prefiltering = [fid.read(80).strip().decode() for ch in channels][:-1] - highpass = np.ravel([re.findall('HP:\s+(\w+)', filt) for filt in prefiltering]) - lowpass = np.ravel([re.findall('LP:\s+(\w+)', filt) for filt in prefiltering]) + chs = list(range(nchan)) + + def _read_chan_byte(): + return np.array([float(fid.read(8).decode()) for ch in chs]) + + chan_info['ch_names'] = [fid.read(16).strip().decode() + for ch in chs] + chan_info['transducers'] = [fid.read(80).strip().decode() + for ch in chs] + chan_info['units'] = [fid.read(8).strip().decode() for ch in chs] + + chan_info['physical_min'] = _read_chan_byte() + chan_info['physical_max'] = _read_chan_byte() + chan_info['digital_min'] = _read_chan_byte() + chan_info['digital_max'] = _read_chan_byte() + + prefiltering = [fid.read(80).strip().decode() for ch in chs][:-1] + highpass = np.ravel([re.findall('HP:\s+(\w+)', filt) + for filt in prefiltering]) + lowpass = np.ravel([re.findall('LP:\s+(\w+)', filt) + for filt in prefiltering]) high_pass_default = 0. + if highpass.size == 0: meas_info['highpass'] = high_pass_default elif all(highpass): @@ -263,8 +332,8 @@ def readHeader(self): meas_info['highpass'] = float(highpass[0]) else: meas_info['highpass'] = float(np.max(highpass)) - warnings.warn('Channels contain different highpass filters. ' - 'Highest filter setting will be stored.') + warn('Channels contain different highpass filters. ' + 'Highest filter setting will be stored.') if lowpass.size == 0: meas_info['lowpass'] = None @@ -275,64 +344,80 @@ def readHeader(self): meas_info['lowpass'] = float(lowpass[0]) else: meas_info['lowpass'] = float(np.min(lowpass)) - warnings.warn('%s' % ('Channels contain different lowpass filters.' - ' Lowest filter setting will be stored.')) + warn('%s' % ('Channels contain different lowpass filters.' + ' Lowest filter setting will be stored.')) # noqa:E127 # number of samples per record - chan_info['n_samps'] = n_samps = np.array([int(fid.read(8).decode()) for ch in channels]) + chan_info['n_samps'] = n_samps = _read_chan_byte() - fid.read(32 *meas_info['nchan']).decode() # reserved + fid.read(32 * meas_info['nchan']).decode() # reserved assert fid.tell() == header_nbytes - if meas_info['n_records']==-1: - # this happens if the n_records is not updated at the end of recording - tot_samps = (os.path.getsize(self.fname)-meas_info['data_offset'])/meas_info['data_size'] - meas_info['n_records'] = tot_samps/sum(n_samps) + if meas_info['n_records'] == -1: + # this happens if n_records isn't updated at recording end + tot_samps = op.getsize(self.fname) - meas_info['data_offset'] + tot_samps /= meas_info['data_size'] + meas_info['n_records'] = tot_samps / sum(n_samps) + + self.calibrate = chan_info['physical_max'] - chan_info['physical_min'] + self.calibrate /= chan_info['digital_max'] - chan_info['digital_min'] - self.calibrate = (chan_info['physical_max'] - chan_info['physical_min'])/(chan_info['digital_max'] - chan_info['digital_min']); - self.offset = chan_info['physical_min'] - self.calibrate * chan_info['digital_min']; - for ch in channels: - if self.calibrate[ch]<0: - self.calibrate[ch] = 1; - self.offset[ch] = 0; + self.offset = chan_info['physical_min'] + self.offset -= self.calibrate * chan_info['digital_min'] + + for ch in chs: + if self.calibrate[ch] < 0: + self.calibrate[ch] = 1 + self.offset[ch] = 0 self.meas_info = meas_info self.chan_info = chan_info return (meas_info, chan_info) def readBlock(self, block): - assert(block>=0) - meas_info = self.meas_info + assert block >= 0 + chan_info = self.chan_info + meas_info = self.meas_info data = [] + with open(self.fname, 'rb') as fid: assert(fid.tell() == 0) blocksize = np.sum(chan_info['n_samps']) * meas_info['data_size'] - fid.seek(meas_info['data_offset'] + block * blocksize) + fid.seek(int(meas_info['data_offset'] + block * blocksize)) for i in range(meas_info['nchan']): - buf = fid.read(chan_info['n_samps'][i]*meas_info['data_size']) - raw = np.asarray(unpack('<{}h'.format(chan_info['n_samps'][i]), buf), dtype=np.float32) + bufsize = int(chan_info['n_samps'][i] * meas_info['data_size']) + buf = fid.read(bufsize) + raw = unpack('<{}h'.format(int(chan_info['n_samps'][i])), buf) + raw = np.asarray(raw, dtype=np.float32) + # FIXME I am not sure about the order of calibrate and offset raw *= self.calibrate[i] - raw += self.offset[i] # FIXME I am not sure about the order of calibrate and offset + raw += self.offset[i] data.append(raw) return data def readSamples(self, channel, begsample, endsample): - meas_info = self.meas_info chan_info = self.chan_info n_samps = chan_info['n_samps'][channel] - begblock = int(floor((begsample) / n_samps)) - endblock = int(floor((endsample) / n_samps)) + + # typecast to int is truncation from float, so there's no need for + # explicit floor() + begblock = int(begsample / n_samps) + endblock = int(endsample / n_samps) + data = self.readBlock(begblock)[channel] - for block in range(begblock+1, endblock+1): + + for block in range(begblock + 1, endblock + 1): data = np.append(data, self.readBlock(block)[channel]) - begsample -= begblock*n_samps - endsample -= begblock*n_samps + + begsample -= begblock * n_samps + endsample -= begblock * n_samps + return data[begsample:(endsample+1)] -#################################################################################################### -# the following are a number of helper functions to make the behaviour of this EDFReader -# class more similar to https://bitbucket.org/cleemesser/python-edf/ -#################################################################################################### +############################################################################### +# the following are a number of helper functions to make the behaviour of +# this EDFReader class similar to https://bitbucket.org/cleemesser/python-edf/ +############################################################################## def getSignalTextLabels(self): # convert from unicode to string @@ -348,34 +433,12 @@ def getNSamples(self): return self.chan_info['n_samps'] * self.meas_info['n_records'] def readSignal(self, chanindx): - begsample = 0; - endsample = self.chan_info['n_samps'][chanindx] * self.meas_info['n_records'] - 1; - return self.readSamples(chanindx, begsample, endsample) - -#################################################################################################### - -if False: - file_in = EDFReader() - file_in.open('/Users/roboos/day 01[10.03].edf') - print file_in.readSamples(0, 0, 0) - print file_in.readSamples(0, 0, 128) - + begsample = 0 -if False: - file_in = EDFReader() - file_in.open('/Users/roboos/test_generator.edf') + n_samps = self.chan_info['n_samps'][chanindx] + n_records = self.meas_info['n_records'] + endsample = n_samps * n_records - 1 - file_out = EDFWriter() - file_out.open('/Users/roboos/test_generator copy.edf') - - header = file_in.readHeader() - - file_out.writeHeader(header) - - meas_info = header[0] - for i in range(meas_info['n_records']): - data = file_in.readBlock(i) - file_out.writeBlock(data) + return self.readSamples(chanindx, begsample, endsample) - file_in.close() - file_out.close() +############################################################################## diff --git a/pyEDF/__init__.py b/pyEDF/__init__.py new file mode 100644 index 0000000..aea1139 --- /dev/null +++ b/pyEDF/__init__.py @@ -0,0 +1,2 @@ +__version__ = 0.1 +from .EDF import EDFWriter, EDFReader diff --git a/pyEDF/tests/config_vars.py b/pyEDF/tests/config_vars.py new file mode 100644 index 0000000..ebd8091 --- /dev/null +++ b/pyEDF/tests/config_vars.py @@ -0,0 +1,6 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2018 Phillip Alday +# License: BSD (3-clause) + +import os.path as op +data_path = op.expanduser('~/edf_data') diff --git a/pyEDF/tests/test_edfreader.py b/pyEDF/tests/test_edfreader.py new file mode 100644 index 0000000..f2bc276 --- /dev/null +++ b/pyEDF/tests/test_edfreader.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2018 Phillip Alday +# License: BSD (3-clause) +"""Tests for reading EDF files""" + +from nose.tools import assert_dict_equal, assert_raises + +from config_vars import data_path +from os.path import join diff --git a/pyEDF/tests/test_edfwriter.py b/pyEDF/tests/test_edfwriter.py new file mode 100644 index 0000000..6b7a436 --- /dev/null +++ b/pyEDF/tests/test_edfwriter.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2018 Phillip Alday +# License: BSD (3-clause) +"""Tests for writing EDF files""" + +from nose.tools import assert_dict_equal, assert_raises + +from config_vars import data_path +from os.path import join diff --git a/pyEDF/tests/test_roundtrip.py b/pyEDF/tests/test_roundtrip.py new file mode 100644 index 0000000..23f610f --- /dev/null +++ b/pyEDF/tests/test_roundtrip.py @@ -0,0 +1,87 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2018 Phillip Alday +# License: BSD (3-clause) +"""Round Trip Data IO tests. + + Note that these tests are *slow*. The entirety of each file is effectively + read in 3 times and written to disk once. +""" + +from nose.tools import assert_dict_equal, assert_true, assert_sequence_equal +import numpy as np + +from config_vars import data_path +from os.path import join + +import pyEDF + +try: + # Python 2 + from os import tempnam, unlink +except ImportError: + # Python 3 + from os import unlink + from tempfile import NamedTemporaryFile + + tempnam = lambda: NamedTemporaryFile(delete=False).name + +def _roundtrip(edf_file): + edfin = pyEDF.EDFReader(edf_file) + + fout = tempnam() + edfout = pyEDF.EDFWriter(fout) + + header = edfin.readHeader() + edfout.writeHeader(header) + + meas_info = header[0] + for i in range(int(meas_info['n_records'])): + edfout.writeBlock(edfin.readBlock(i)) + + edfin.close() + edfout.close() + + original = pyEDF.EDFReader(edf_file) + copy = pyEDF.EDFReader(fout) + + copy_header = copy.readHeader() + + assert_dict_equal(header[0], copy_header[0]) + for key in header[1]: + assert_sequence_equal(list(header[1][key]), list(copy_header[1][key])) + + for i in range(int(meas_info['n_records'])): + # although this is floating point, it's not really numerics. + # it's just comparing copies of the same data, so exact equality + # should be a doable goal. + for ch_orig, ch_copy in zip(original.readBlock(i), copy.readBlock(i)): + assert_sequence_equal(list(ch_orig), list(ch_copy)) + + unlink(fout) + +def test_roundtrip_0601_s(): + '''Roundtrip of file 0601_s.edf''' + # this file seems to have bad physical_min values + _roundtrip(join(data_path, '0601_s.edf')) + +def test_roundtrip_composition1_0s_to_1892s_fs20_15channels_tap127(): + '''Roundtrip of file composition1_0s_to_1892s_fs20_15channels_tap127.edf''' + _roundtrip(join(data_path, 'composition1_0s_to_1892s_fs20_15channels_tap127.edf')) + + +def test_roundtrip_NY394_VisualLoc_R1(): + '''Roundtrip of file NY394_VisualLoc_R1.edf''' + _roundtrip(join(data_path, 'NY394_VisualLoc_R1.edf')) + + +def test_roundtrip_shhs1_200001(): + '''Roundtrip of file shhs1-200001.edf''' + _roundtrip(join(data_path, 'shhs1-200001.edf')) + +def test_roundtrip_testAlphaIR20170321_0(): + '''Roundtrip of file testAlphaIR20170321-0.edf''' + _roundtrip(join(data_path, 'testAlphaIR20170321-0.edf')) + +def test_roundtrip_test_generator(): + '''Roundtrip of file test_generator.edf''' + _roundtrip(join(data_path, 'test_generator.edf')) diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..68d6c08 --- /dev/null +++ b/setup.py @@ -0,0 +1,101 @@ +#! /usr/bin/env python +# +# Copyright (C) 2018 Phillip Alday +# +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. + +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + + +import os +from os import path as op + +from setuptools import setup + +PACKAGE_NAME = 'pyEDF' + +# get the version (don't import mne here, so dependencies are not needed) +version = None +with open(os.path.join(PACKAGE_NAME, '__init__.py'), 'r') as fid: + for line in (line.strip() for line in fid): + if line.startswith('__version__'): + version = line.split('=')[1].strip().strip('\'') + break +if version is None: + raise RuntimeError('Could not determine version') + +descr = """Reader and Writer for the European Data Format (EDF)""" + +DISTNAME = PACKAGE_NAME +DESCRIPTION = descr +AUTHOR = 'Robert Oostenveld' +AUTHOR_EMAIL = 'r.oostenveld@donders.ru.nl' +MAINTAINER = 'Phillip Alday' +MAINTAINER_EMAIL = 'me@phillipalday.com' +URL = 'https://github.com/robertoostenveld/pyEDF' +LICENSE = 'BSD (3-clause)' +DOWNLOAD_URL = 'https://github.com/robertoostenveld/pyEDF' +VERSION = version + +def package_tree(pkgroot): + """Get the submodule list.""" + # Adapted from MNE-Python + path = os.path.dirname(__file__) + subdirs = [os.path.relpath(i[0], path).replace(os.path.sep, '.') + for i in os.walk(os.path.join(path, pkgroot)) + if '__init__.py' in i[2]] + return sorted(subdirs) + +if __name__ == "__main__": + if os.path.exists('MANIFEST'): + os.remove('MANIFEST') + + setup(name=DISTNAME, + author=AUTHOR, + author_email=AUTHOR_EMAIL, + maintainer=MAINTAINER, + maintainer_email=MAINTAINER_EMAIL, + include_package_data=True, + description=DESCRIPTION, + license=LICENSE, + url=URL, + version=VERSION, + download_url=DOWNLOAD_URL, + long_description=open('README.md').read(), + zip_safe=True, # the package can run out of an .egg file + classifiers=['Intended Audience :: Science/Research', + 'Intended Audience :: Developers', + 'License :: OSI Approved', + 'Programming Language :: Python', + 'Topic :: Software Development', + 'Topic :: Scientific/Engineering', + 'Operating System :: Microsoft :: Windows', + 'Operating System :: POSIX', + 'Operating System :: Unix', + 'Operating System :: MacOS'], + platforms='any', + packages=package_tree(PACKAGE_NAME), + )