Skip to content

Commit

Permalink
Add multiple dimension slicing/indexing for HDF5 (tensorflow#795)
Browse files Browse the repository at this point in the history
* Add multiple dimension slicing/indexing for HDF5

This PR adds the multiple dimension slicing/indexing for HDF5 IOTensor.
With this PR, it is possible to select a chunk of the data from different
dimensions of the HDF5IOTensor:
```
tensor = tfio.IOTensor.from_hdf5(...)
tensor[:, 3, 2:8]
```

This PR fixes 754.

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>

* Fix typo

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
  • Loading branch information
yongtang authored Feb 20, 2020
1 parent 9434ce7 commit e57b5ff
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 45 deletions.
78 changes: 42 additions & 36 deletions tensorflow_io/core/kernels/hdf5_kernels.cc
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,12 @@ class HDF5ReadableResource : public ResourceBase {
*dtype = dtypes_[column_index];
return Status::OK();
}
Status Read(const string& component, const int64 start,
Status Read(const string& component,
const absl::InlinedVector<int64, 4>& start,
const TensorShape& shape,
std::function<Status(const TensorShape& shape, Tensor** value)>
allocate_func) {
mutex_lock l(mu);

std::unordered_map<std::string, int64>::const_iterator lookup =
columns_index_.find(component);
if (lookup == columns_index_.end()) {
Expand All @@ -296,26 +296,6 @@ class HDF5ReadableResource : public ResourceBase {
Tensor* value;
TF_RETURN_IF_ERROR(allocate_func(shape, &value));

if (shape.dims() > 0) {
if (shape.dim_size(0) == 0) {
return Status::OK();
}

int64 element_start = start;

if (element_start > shapes_[column_index].dim_size(0)) {
return errors::InvalidArgument(
"start ", element_start,
" out of boundary: ", shapes_[column_index]);
}
int64 element_stop = element_start + shape.dim_size(0);
if (element_stop > shapes_[column_index].dim_size(0)) {
return errors::InvalidArgument(
"start ", element_start, " and shape ", shape,
" out of boundary: ", shapes_[column_index]);
}
}

H5::H5File* file = file_image_->GetFile();
try {
H5::DataSet data_set = file->openDataSet(component);
Expand All @@ -326,19 +306,30 @@ class HDF5ReadableResource : public ResourceBase {

if (shape.dims() != 0) {
int rank = data_space.getSimpleExtentNdims();
if (rank != shape.dims()) {
return errors::InvalidArgument("rank does not match: ", rank, " vs. ",
shape.dims());
}
absl::InlinedVector<hsize_t, 4> dims(rank);
data_space.getSimpleExtentDims(dims.data());
absl::InlinedVector<hsize_t, 4> dims_start(rank);

// Find the border of the dims start and dims
absl::InlinedVector<hsize_t, 4> dims_start(dims.size(), 0);
dims_start[0] = start;
dims[0] = shape.dim_size(0);
data_space.getSimpleExtentDims(dims.data());
for (int i = 0; i < rank; i++) {
if (start[i] > dims[i] || start[i] + shape.dim_size(i) > dims[i]) {
return errors::InvalidArgument(
"dimension [", i, "] out of boundary: start=", start[i],
", slice=", shape.dim_size(i), ", boundary=", dims[i]);
}
dims_start[i] = start[i];
dims[i] = shape.dim_size(i);
}

memory_space = H5::DataSpace(dims.size(), dims.data());

data_space.selectHyperslab(H5S_SELECT_SET, dims.data(),
dims_start.data());
}

switch (dtypes_[column_index]) {
case DT_UINT8:
data_set.read(value->flat<uint8>().data(), data_type, memory_space,
Expand Down Expand Up @@ -581,21 +572,36 @@ class HDF5ReadableReadOp : public OpKernel {

const Tensor* start_tensor;
OP_REQUIRES_OK(context, context->input("start", &start_tensor));
int64 start = start_tensor->scalar<int64>()();
absl::InlinedVector<int64, 4> start(shape.dims());
for (int64 i = 0; i < start_tensor->NumElements(); i++) {
start[i] = start_tensor->flat<int64>()(i);
}
for (int64 i = start_tensor->NumElements(); i < shape.dims(); i++) {
start[i] = 0;
}

const Tensor* stop_tensor;
OP_REQUIRES_OK(context, context->input("stop", &stop_tensor));
int64 stop = stop_tensor->scalar<int64>()();
if (shape.dims() > 0) {
if (stop < 0 || stop > shape.dim_size(0)) {
stop = shape.dim_size(0);
absl::InlinedVector<int64, 4> stop(stop_tensor->shape().dims());
for (int64 i = 0; i < stop_tensor->NumElements(); i++) {
stop[i] = stop_tensor->flat<int64>()(i);
}
for (int64 i = stop_tensor->NumElements(); i < shape.dims(); i++) {
stop[i] = shape.dim_size(i);
}

for (int64 i = 0; i < shape.dims(); i++) {
if (stop[i] < 0 || stop[i] > shape.dim_size(i)) {
stop[i] = shape.dim_size(i);
}
if (start > stop) {
start = stop;
if (start[i] > stop[i]) {
start[i] = stop[i];
}

shape.set_dim(0, stop - start);
}
for (int64 i = 0; i < shape.dims(); i++) {
shape.set_dim(i, stop[i] - start[i]);
}

OP_REQUIRES_OK(
context,
resource->Read(component, start, shape,
Expand Down
27 changes: 19 additions & 8 deletions tensorflow_io/core/python/ops/hdf5_io_tensor_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,26 @@ def to_tensor(self):
#=============================================================================
def __getitem__(self, key):
"""Returns the specified piece of this IOTensor."""
if isinstance(key, slice):
return core_ops.io_hdf5_readable_read(
self._resource, self._component, self._shape,
key.start, key.stop, dtype=self._dtype)
# always convert to tuple to process
if not isinstance(key, tuple):
key = tuple([key])
# get the start and stop of each element
indices = [
(k.start, k.stop) if isinstance(k, slice) else (k, k + 1) for k in key]
# get the start and stop, and use 0 (start) and -1 (stop) if needed
indices = list(zip(*indices))
start = [0 if e is None else e for e in indices[0]]
stop = [-1 if e is None else e for e in indices[1]]

item = core_ops.io_hdf5_readable_read(
self._resource, key, key + 1, dtype=self._dtype)
if tf.shape(item)[0] == 0:
raise IndexError("index %s is out of range" % key)
return item[0]
self._resource, self._component, self._shape,
start=start, stop=stop, dtype=self._dtype)

# in case certain dimension is not slice, then this dimension will need to
# collapse as `0`, otherwise `:` or `slice(None, None, None)`
indices = [slice(None) if isinstance(k, slice) else 0 for k in key]

return item.__getitem__(indices)

def __len__(self):
"""Returns the total number of items of this IOTensor."""
Expand Down
49 changes: 48 additions & 1 deletion tests/test_io_tensor_eager.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,28 @@ def fin():

return args, func, expected

@pytest.fixture(name="hdf5_multiple_dimension", scope="module")
def fixture_hdf5_multiple_dimension(request):
"""fixture_hdf5_multiple_dimension"""
import h5py # pylint: disable=import-outside-toplevel

tmp_path = tempfile.mkdtemp()
filename = os.path.join(tmp_path, "test.h5")

data = [np.arange(i, i+20) for i in range(20)]

with h5py.File(filename, 'w') as f:
f.create_dataset('float64', data=np.asarray(data, np.float64), dtype='f8')
args = filename
def func(args):
return tfio.IOTensor.from_hdf5(args)('/float64')
expected = np.asarray(data, np.float64)

def fin():
shutil.rmtree(tmp_path)
request.addfinalizer(fin)

return args, func, expected

@pytest.fixture(name="arrow", scope="module")
def fixture_arrow():
Expand All @@ -296,7 +318,6 @@ def fixture_arrow():

return args, func, expected


# scalar is a special IOTensor that is alias to Tensor
@pytest.mark.parametrize(
("io_tensor_fixture"),
Expand Down Expand Up @@ -356,6 +377,32 @@ def test_io_tensor_slice(fixture_lookup, io_tensor_fixture):
for start, stop in list(zip(indices, indices[1:] + [len(expected)])):
assert np.array_equal(io_tensor[start:stop], expected[start:stop])

# full slicing/index across multiple dimensions
@pytest.mark.parametrize(
("io_tensor_fixture"),
[
pytest.param("hdf5_multiple_dimension"),
],
ids=[
"hdf5",
],
)
def test_io_tensor_slice_multiple_dimension(fixture_lookup, io_tensor_fixture):
"""test_io_tensor_slice_multiple_dimension"""
args, func, expected = fixture_lookup(io_tensor_fixture)

io_tensor = func(args)

# Test __getitem__, use 7 to partition dimension 0, 11 for dimension 1
indices_0 = list(range(0, len(expected), 7))
for start_0, stop_0 in list(zip(indices_0, indices_0[1:] + [len(expected)])):
indices_1 = list(range(0, len(expected[0]), 11))
for start_1, stop_1 in list(
zip(indices_1, indices_1[1:] + [len(expected[0])])):
assert np.array_equal(
io_tensor[start_0:stop_0, start_1:stop_1],
expected[start_0:stop_0, start_1:stop_1])

# slice (__getitem__) could also be inside dataset for GraphIOTensor
@pytest.mark.parametrize(
("io_tensor_fixture", "num_parallel_calls"),
Expand Down

0 comments on commit e57b5ff

Please sign in to comment.