Skip to content

Commit

Permalink
Address code review
Browse files Browse the repository at this point in the history
Co-authored-by: Vyas Ramasubramani <vyasr@nvidia.com>
  • Loading branch information
lithomas1 and vyasr committed Jun 27, 2024
1 parent 8fc139f commit e940e30
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 106 deletions.
2 changes: 0 additions & 2 deletions python/cudf/cudf/_lib/pylibcudf/io/json.pxd
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from libcpp cimport bool
from libcpp.string cimport string

from cudf._lib.pylibcudf.io.types cimport SinkInfo, TableWithMetadata
from cudf._lib.pylibcudf.libcudf.io.types cimport compression_type
from cudf._lib.pylibcudf.libcudf.types cimport size_type


Expand Down
6 changes: 2 additions & 4 deletions python/cudf/cudf/_lib/pylibcudf/io/json.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ cpdef void write_json(
"""
cdef table_metadata tbl_meta = table_w_meta.metadata
cdef string na_rep_c = na_rep.encode()
cdef string true_value_c = true_value.encode()
cdef string false_value_c = false_value.encode()

cdef json_writer_options options = move(
json_writer_options.builder(sink_info.c_obj, table_w_meta.tbl.view())
Expand All @@ -63,9 +61,9 @@ cpdef void write_json(
if rows_per_chunk != numeric_limits[size_type].max():
options.set_rows_per_chunk(rows_per_chunk)
if true_value != "true":
options.set_true_value(true_value_c)
options.set_true_value(<string>true_value.encode())
if false_value != "false":
options.set_false_value(false_value_c)
options.set_false_value(<string>false_value.encode())

with nogil:
cpp_write_json(options)
76 changes: 32 additions & 44 deletions python/cudf/cudf/_lib/pylibcudf/io/types.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ cdef class TableWithMetadata:
if not isinstance(name, str):
raise ValueError("Column name must be a string!")

info.name = move(<string> name.encode())
info.name = <string> name.encode()
info.children = move(self._make_column_info(child_names))

col_name_infos.push_back(info)
Expand Down Expand Up @@ -200,60 +200,48 @@ cdef class SinkInfo:

def __init__(self, list sinks):
cdef vector[data_sink *] data_sinks
cdef unique_ptr[data_sink] sink

cdef vector[string] paths

if not sinks:
raise ValueError("Need to pass at least one sink")

if isinstance(sinks[0], io.StringIO):
data_sinks.reserve(len(sinks))
for s in sinks:
if not isinstance(s, io.StringIO):
raise ValueError("All sinks must be of the same type!")
self.sink_storage.push_back(
unique_ptr[data_sink](new iobase_data_sink(s))
)
data_sinks.push_back(self.sink_storage.back().get())
self.c_obj = sink_info(data_sinks)
elif isinstance(sinks[0], io.IOBase):
data_sinks.reserve(len(sinks))
for s in sinks:
# Files opened in text mode expect writes to be str rather than
# bytes, which requires conversion from utf-8. If the underlying
# buffer is utf-8, we can bypass this conversion by writing
# directly to it.
if isinstance(s, io.TextIOBase):
cdef object initial_sink_cls = type(sinks[0])

for s in sinks:
if not isinstance(s, initial_sink_cls):
raise ValueError("All sinks must be of the same type!")
if isinstance(s, str):
paths.reserve(len(sinks))
paths.push_back(<string> s.encode())
elif isinstance(s, os.PathLike):
paths.reserve(len(sinks))
paths.push_back(<string> os.path.expanduser(s).encode())
else:
data_sinks.reserve(len(sinks))
if isinstance(s, (io.StringIO, io.BytesIO)):
self.sink_storage.push_back(
unique_ptr[data_sink](new iobase_data_sink(s))
)
elif isinstance(s, io.TextIOBase):
if codecs.lookup(s.encoding).name not in {
"utf-8",
"ascii",
}:
raise NotImplementedError(f"Unsupported encoding {s.encoding}")
sink = move(unique_ptr[data_sink](new iobase_data_sink(s.buffer)))
elif isinstance(s, io.BytesIO):
sink = move(unique_ptr[data_sink](new iobase_data_sink(s)))
raise NotImplementedError(
f"Unsupported encoding {s.encoding}"
)
self.sink_storage.push_back(
unique_ptr[data_sink](new iobase_data_sink(s.buffer))
)
else:
raise ValueError("All sinks must be of the same type!")
raise TypeError(
"Unrecognized input type: {}".format(type(sinks[0]))
)

self.sink_storage.push_back(
move(sink)
)
data_sinks.push_back(self.sink_storage.back().get())

if data_sinks.size() > 0:
self.c_obj = sink_info(data_sinks)
elif isinstance(sinks[0], str):
paths.reserve(len(sinks))
for s in sinks:
if not isinstance(s, str):
raise ValueError("All sinks must be of the same type!")
paths.push_back(<string> s.encode())
self.c_obj = sink_info(move(paths))
elif isinstance(sinks[0], os.PathLike):
paths.reserve(len(sinks))
for s in sinks:
if not isinstance(s, os.PathLike):
raise ValueError("All sinks must be of the same type!")
paths.push_back(<string> os.path.expanduser(s).encode())
self.c_obj = sink_info(move(paths))
else:
raise TypeError("Unrecognized input type: {}".format(type(sinks[0])))
# we don't have sinks so we must have paths to sinks
self.c_obj = sink_info(paths)
10 changes: 5 additions & 5 deletions python/cudf/cudf/pylibcudf_tests/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,24 +145,24 @@ def is_fixed_width(plc_dtype: plc.DataType):
)


def nesting(typ) -> tuple[int, int]:
def nesting_level(typ) -> tuple[int, int]:
"""Return list and struct nesting of a pyarrow type."""
if isinstance(typ, pa.ListType):
list_, struct = nesting(typ.value_type)
list_, struct = nesting_level(typ.value_type)
return list_ + 1, struct
elif isinstance(typ, pa.StructType):
lists, structs = map(max, zip(*(nesting(t.type) for t in typ)))
lists, structs = map(max, zip(*(nesting_level(t.type) for t in typ)))
return lists, structs + 1
else:
return 0, 0


def is_nested_struct(typ):
return nesting(typ)[1] > 1
return nesting_level(typ)[1] > 1


def is_nested_list(typ):
return nesting(typ)[0] > 1
return nesting_level(typ)[0] > 1


def sink_to_str(sink):
Expand Down
81 changes: 31 additions & 50 deletions python/cudf/cudf/pylibcudf_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def table_data(request):
"""
nrows = request.param

table_dict = dict()
table_dict = {}
# Colnames in the format expected by
# plc.io.TableWithMetadata
colnames = []
Expand All @@ -63,55 +63,36 @@ def table_data(request):
rand_vals = np.random.randint(0, nrows, nrows)
child_colnames = []

if isinstance(typ, pa.ListType):

def _generate_list_data(typ):
child_colnames = []
if isinstance(typ, pa.ListType):
# recurse to get vals
rand_arrs, grandchild_colnames = _generate_list_data(
typ.value_type
)
pa_array = pa.array(
[list(row_vals) for row_vals in zip(rand_arrs)],
type=typ,
)
child_colnames.append(("", grandchild_colnames))
else:
# typ is scalar type
pa_array = pa.array(rand_vals).cast(typ)
child_colnames.append(("", []))
return pa_array, child_colnames

rand_arr, child_colnames = _generate_list_data(typ)
elif isinstance(typ, pa.StructType):

def _generate_struct_data(typ):
child_colnames = []
if isinstance(typ, pa.StructType):
# recurse to get vals
rand_arrs = []
for i in range(typ.num_fields):
rand_arr, grandchild_colnames = _generate_struct_data(
typ.field(i).type
)
rand_arrs.append(rand_arr)
child_colnames.append(
(typ.field(i).name, grandchild_colnames)
)

pa_array = pa.StructArray.from_arrays(
[rand_arr for rand_arr in rand_arrs],
names=[
typ.field(i).name for i in range(typ.num_fields)
],
)
else:
# typ is scalar type
pa_array = pa.array(rand_vals).cast(typ)
return pa_array, child_colnames

rand_arr, child_colnames = _generate_struct_data(typ)
def _generate_nested_data(typ):
child_colnames = []

# recurse to get vals for children
rand_arrs = []
for i in range(typ.num_fields):
rand_arr, grandchild_colnames = _generate_nested_data(
typ.field(i).type
)
rand_arrs.append(rand_arr)
child_colnames.append((typ.field(i).name, grandchild_colnames))

if isinstance(typ, pa.StructType):
pa_array = pa.StructArray.from_arrays(
[rand_arr for rand_arr in rand_arrs],
names=[typ.field(i).name for i in range(typ.num_fields)],
)
elif isinstance(typ, pa.ListType):
pa_array = pa.array(
[list(row_vals) for row_vals in zip(rand_arrs[0])],
type=typ,
)
child_colnames.append(("", grandchild_colnames))
else:
# typ is scalar type
pa_array = pa.array(rand_vals).cast(typ)
return pa_array, child_colnames

if isinstance(typ, (pa.ListType, pa.StructType)):
rand_arr, child_colnames = _generate_nested_data(typ)
else:
rand_arr = pa.array(rand_vals).cast(typ)

Expand Down
2 changes: 1 addition & 1 deletion python/cudf/cudf/pylibcudf_tests/io/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def test_write_json_basic(table_data, source_or_sink, lines, rows_per_chunk):
plc_table_w_meta, pa_table = table_data
sink = source_or_sink

kwargs = dict()
kwargs = {}
if rows_per_chunk <= plc_table_w_meta.tbl.num_rows():
kwargs["rows_per_chunk"] = rows_per_chunk

Expand Down

0 comments on commit e940e30

Please sign in to comment.