Skip to content

Commit

Permalink
Fixed headers management for multiparts in tabular resource (#257)
Browse files Browse the repository at this point in the history
* discarding header row of tabular multipart chunks
relates #256

* using streams correctly in python 3.x

* Remove first row in chunk only if != header\n see https://github.com/frictionlessdata/datapackage-py/pull/257#pullrequestreview-375804017\n and https://github.com/frictionlessdata/forum/issues/1

* bug in row iteration

* better warning message

* use iterator for streams in multipart fixes #246

* linter compatible
  • Loading branch information
paulgirard authored Aug 14, 2020
1 parent d41605e commit ab08bd3
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 11 deletions.
2 changes: 2 additions & 0 deletions data/chunk2-with-headers.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
id,name
2,中国人
44 changes: 37 additions & 7 deletions datapackage/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ def raw_iter(self, stream=False):

# Get filelike
if self.multipart:
filelike = _MultipartSource(self.source, remote=self.remote)
filelike = _MultipartSource(self)
elif self.remote:
if self.__table_options.get('http_session'):
http_session = self.__table_options['http_session']
Expand Down Expand Up @@ -657,7 +657,7 @@ def __get_table(self):
# Get source/schema
source = self.source
if self.multipart:
source = _MultipartSource(self.source, remote=self.remote)
source = _MultipartSource(self)
schema = self.__current_descriptor.get('schema')

# Storage resource
Expand Down Expand Up @@ -851,9 +851,17 @@ class _MultipartSource(object):

# Public

def __init__(self, source, remote=False):
self.__source = source
self.__remote = remote
def __init__(self, resource):
# testing if we have headers
if resource.tabular \
and (resource.descriptor.get('dialect') and resource.descriptor.get('dialect').get('header')
or (not resource.descriptor.get('dialect') and config.DEFAULT_DIALECT['header'])):
remove_chunk_header_row = True
else:
remove_chunk_header_row = False
self.__source = resource.source
self.__remote = resource.remote
self.__remove_chunk_header_row = remove_chunk_header_row
self.__rows = self.__iter_rows()

def __enter__(self):
Expand Down Expand Up @@ -910,8 +918,30 @@ def __iter_rows(self):
streams = (urlopen(chunk) for chunk in self.__source)
else:
streams = (io.open(chunk, 'rb') for chunk in self.__source)
for stream in streams:
firstStream = True
header_row = None
for stream, chunk in zip(streams, self.__source):
firstRow = True
for row in stream:
if not row.endswith(b'\n'):
row += b'\n'
yield row
# if tabular, skip header row in the concatenation stream
if firstRow and self.__remove_chunk_header_row:
if firstStream:
# store the first stream header row and yield it
header_row = row
yield row
elif row == header_row:
# remove header row of new stream is same as header from first stream
pass
else:
# yield this first row but warn the user for deprecated situation
# TODO: this warning might be removed in future releases ?
warnings.warn("""%s has no headers whereas header = True.
Deprecated legacy multi-part mode for tabular data.
Headers will be required in chunks/multiparts in future.""" % chunk, UserWarning)
yield row
else:
yield row
firstRow = False
firstStream = False
39 changes: 35 additions & 4 deletions tests/test_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ def test_descriptor_table_tabular_multipart_local():
descriptor = {
'name': 'name',
'profile': 'tabular-data-resource',
'path': ['chunk1.csv', 'chunk2.csv'],
'path': ['chunk1.csv', 'chunk2-with-headers.csv'],
'schema': 'resource_schema.json',
}
resource = Resource(descriptor, base_path='data')
Expand All @@ -490,6 +490,36 @@ def test_descriptor_table_tabular_multipart_local():
{'id': 2, 'name': '中国人'},
]

def test_descriptor_table_tabular_multipart_noheader_local():
descriptor = {
'name': 'name',
'profile': 'tabular-data-resource',
'path': ['chunk2.csv', 'chunk2.csv'],
'schema': 'resource_schema.json',
'dialect': {'header': False},
}
resource = Resource(descriptor, base_path='data')
assert resource.table.read(keyed=True) == [
{'id': 2, 'name': '中国人'},
{'id': 2, 'name': '中国人'},
]

# test warning on legacy multipart header
# TODO: to remove in future release ?
def test_descriptor_table_tabular_multipart_mix_header_local():
descriptor = {
'name': 'name',
'profile': 'tabular-data-resource',
'path': ['chunk1.csv', 'chunk2.csv'],
'schema': 'resource_schema.json',
'dialect': {'header': True},
}
with pytest.warns(UserWarning):
resource = Resource(descriptor, base_path='data')
assert resource.table.read(keyed=True) == [
{'id': 1, 'name': 'english'},
{'id': 2, 'name': '中国人'},
]

def test_descriptor_table_tabular_multipart_remote(patch_get):
descriptor = {
Expand All @@ -503,14 +533,15 @@ def test_descriptor_table_tabular_multipart_remote(patch_get):
'schema': 'resource_schema.json',
}
# Mocks
patch_get('http://example.com/chunk1.csv', body="id,name\n")
patch_get('http://example.com/chunk2.csv', body="1,english")
patch_get('http://example.com/chunk3.csv', body="2,中国人\n")
patch_get('http://example.com/chunk1.csv', body="id,name\n1,english")
patch_get('http://example.com/chunk2.csv', body="id,name\n2,中国人\n")
patch_get('http://example.com/chunk3.csv', body="id,name\n3,français\n")
# Tests
resource = Resource(descriptor, base_path='data')
assert resource.table.read(keyed=True) == [
{'id': 1, 'name': 'english'},
{'id': 2, 'name': '中国人'},
{'id': 3, 'name': 'français'},
]


Expand Down

0 comments on commit ab08bd3

Please sign in to comment.