Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add low memory JSON reader for cudf.pandas #16204

Merged
merged 15 commits into from
Jul 12, 2024
Merged
3 changes: 2 additions & 1 deletion cpp/src/io/json/read_json.cu
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ datasource::owning_buffer<rmm::device_uvector<char>> get_record_range_raw_input(
size_t chunk_size = reader_opts.get_byte_range_size();

CUDF_EXPECTS(total_source_size ? chunk_offset < total_source_size : !chunk_offset,
"Invalid offsetting");
"Invalid offsetting",
std::invalid_argument);
auto should_load_all_sources = !chunk_size || chunk_size >= total_source_size - chunk_offset;
chunk_size = should_load_all_sources ? total_source_size - chunk_offset : chunk_size;

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/utilities/datasource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ class memory_mapped_source : public file_source {

void map(int fd, size_t offset, size_t size)
{
CUDF_EXPECTS(offset < _file.size(), "Offset is past end of file");
CUDF_EXPECTS(offset < _file.size(), "Offset is past end of file", std::overflow_error);

// Offset for `mmap()` must be page aligned
_map_offset = offset & ~(sysconf(_SC_PAGESIZE) - 1);
Expand Down
63 changes: 42 additions & 21 deletions python/cudf/cudf/_lib/json.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ from cudf.core.buffer import acquire_spill_lock
from libcpp cimport bool

cimport cudf._lib.pylibcudf.libcudf.io.types as cudf_io_types
from cudf._lib.column cimport Column
from cudf._lib.io.utils cimport add_df_col_struct_names
from cudf._lib.pylibcudf.io.types cimport compression_type
from cudf._lib.pylibcudf.libcudf.io.json cimport json_recovery_mode_t
from cudf._lib.pylibcudf.libcudf.io.types cimport compression_type
from cudf._lib.pylibcudf.libcudf.types cimport data_type, type_id
from cudf._lib.pylibcudf.types cimport DataType
from cudf._lib.types cimport dtype_to_data_type
from cudf._lib.utils cimport data_from_pylibcudf_io
from cudf._lib.utils cimport _data_from_columns, data_from_pylibcudf_io

import cudf._lib.pylibcudf as plc

Expand Down Expand Up @@ -98,28 +99,48 @@ cpdef read_json(object filepaths_or_buffers,
else:
raise TypeError("`dtype` must be 'list like' or 'dict'")

table_w_meta = plc.io.json.read_json(
plc.io.SourceInfo(filepaths_or_buffers),
processed_dtypes,
c_compression,
lines,
byte_range_offset = byte_range[0] if byte_range is not None else 0,
byte_range_size = byte_range[1] if byte_range is not None else 0,
keep_quotes = keep_quotes,
mixed_types_as_string = mixed_types_as_string,
prune_columns = prune_columns,
recovery_mode = _get_json_recovery_mode(on_bad_lines)
)

df = cudf.DataFrame._from_data(
*data_from_pylibcudf_io(
table_w_meta
if cudf.get_option("mode.pandas_compatible") and lines:
res_cols, res_col_names, res_child_names = plc.io.json.chunked_read_json(
plc.io.SourceInfo(filepaths_or_buffers),
processed_dtypes,
c_compression,
keep_quotes = keep_quotes,
mixed_types_as_string = mixed_types_as_string,
prune_columns = prune_columns,
recovery_mode = _get_json_recovery_mode(on_bad_lines)
)
df = cudf.DataFrame._from_data(
*_data_from_columns(
columns=[Column.from_pylibcudf(plc) for plc in res_cols],
column_names=res_col_names,
index_names=None
)
)
add_df_col_struct_names(df, res_child_names)
return df
else:
table_w_meta = plc.io.json.read_json(
plc.io.SourceInfo(filepaths_or_buffers),
processed_dtypes,
c_compression,
lines,
byte_range_offset = byte_range[0] if byte_range is not None else 0,
byte_range_size = byte_range[1] if byte_range is not None else 0,
keep_quotes = keep_quotes,
mixed_types_as_string = mixed_types_as_string,
prune_columns = prune_columns,
recovery_mode = _get_json_recovery_mode(on_bad_lines)
)

df = cudf.DataFrame._from_data(
*data_from_pylibcudf_io(
table_w_meta
)
)
)

# Post-processing to add in struct column names
add_df_col_struct_names(df, table_w_meta.child_names)
return df
# Post-processing to add in struct column names
add_df_col_struct_names(df, table_w_meta.child_names)
return df


@acquire_spill_lock()
Expand Down
11 changes: 11 additions & 0 deletions python/cudf/cudf/_lib/pylibcudf/io/json.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,14 @@ cpdef void write_json(
str true_value = *,
str false_value = *
)

cpdef tuple chunked_read_json(
SourceInfo source_info,
list dtypes = *,
compression_type compression = *,
bool keep_quotes = *,
bool mixed_types_as_string = *,
bool prune_columns = *,
json_recovery_mode_t recovery_mode = *,
int chunk_size= *,
)
176 changes: 154 additions & 22 deletions python/cudf/cudf/_lib/pylibcudf/io/json.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ from libcpp.string cimport string
from libcpp.utility cimport move
from libcpp.vector cimport vector

from cudf._lib.pylibcudf.concatenate cimport concatenate
from cudf._lib.pylibcudf.io.types cimport (
SinkInfo,
SourceInfo,
Expand Down Expand Up @@ -50,6 +51,144 @@ cdef map[string, schema_element] _generate_schema_map(list dtypes):
return schema_map


cdef json_reader_options _setup_json_reader_options(
SourceInfo source_info,
list dtypes,
compression_type compression,
bool lines,
size_type byte_range_offset,
size_type byte_range_size,
bool keep_quotes,
bool mixed_types_as_string,
bool prune_columns,
json_recovery_mode_t recovery_mode):

cdef vector[data_type] types_vec
cdef json_reader_options opts = move(
json_reader_options.builder(source_info.c_obj)
.compression(compression)
.lines(lines)
.byte_range_offset(byte_range_offset)
.byte_range_size(byte_range_size)
.recovery_mode(recovery_mode)
.build()
)

if dtypes is not None:
if isinstance(dtypes[0], tuple):
opts.set_dtypes(move(_generate_schema_map(dtypes)))
else:
for dtype in dtypes:
types_vec.push_back((<DataType>dtype).c_obj)
opts.set_dtypes(types_vec)

opts.enable_keep_quotes(keep_quotes)
opts.enable_mixed_types_as_string(mixed_types_as_string)
opts.enable_prune_columns(prune_columns)
return opts


cpdef tuple chunked_read_json(
SourceInfo source_info,
list dtypes = None,
compression_type compression = compression_type.AUTO,
bool keep_quotes = False,
bool mixed_types_as_string = False,
bool prune_columns = False,
json_recovery_mode_t recovery_mode = json_recovery_mode_t.FAIL,
int chunk_size=100_000_000,
):
"""Reads an JSON file into a :py:class:`~.types.TableWithMetadata`.
galipremsagar marked this conversation as resolved.
Show resolved Hide resolved

Parameters
----------
source_info : SourceInfo
The SourceInfo object to read the JSON file from.
dtypes : list, default None
Set data types for the columns in the JSON file.

Each element of the list has the format
(column_name, column_dtype, list of child dtypes), where
the list of child dtypes is an empty list if the child is not
a nested type (list or struct dtype), and is of format
(column_child_name, column_child_type, list of grandchild dtypes).
compression: CompressionType, default CompressionType.AUTO
The compression format of the JSON source.
keep_quotes : bool, default False
Whether the reader should keep quotes of string values.
mixed_types_as_string : bool, default False
If True, mixed type columns are returned as string columns.
If `False` parsing mixed type columns will thrown an error.
prune_columns : bool, default False
Whether to only read columns specified in dtypes.
recover_mode : JSONRecoveryMode, default JSONRecoveryMode.FAIL
Whether to raise an error or set corresponding values to null
when encountering an invalid JSON line.
chunk_size : int, default 100_000_000 bytes.
The number of bytes to be read in chunks.
galipremsagar marked this conversation as resolved.
Show resolved Hide resolved
The chunk_size should be set to at least row_size.

Returns
-------
tuple
A tuple of (columns, column_name, child_names)
"""
cdef size_type c_range_size = (
chunk_size if chunk_size is not None else 0
)
cdef json_reader_options opts = _setup_json_reader_options(
source_info=source_info,
dtypes=dtypes,
compression=compression,
lines=True,
byte_range_offset=0,
byte_range_size=0,
keep_quotes=keep_quotes,
mixed_types_as_string=mixed_types_as_string,
prune_columns=prune_columns,
recovery_mode=recovery_mode,
)

# Read JSON
cdef table_with_metadata c_result

final_columns = []
meta_names = None
child_names = None
i = 0
while True:
opts.set_byte_range_offset(c_range_size * i)
opts.set_byte_range_size(c_range_size)

try:
with nogil:
c_result = move(cpp_read_json(opts))
except (ValueError, OverflowError):
break
Matt711 marked this conversation as resolved.
Show resolved Hide resolved
if meta_names is None:
meta_names = [info.name.decode() for info in c_result.metadata.schema_info]
if child_names is None:
child_names = TableWithMetadata._parse_col_names(
c_result.metadata.schema_info
)
new_chunk = [
col for col in TableWithMetadata.from_libcudf(
c_result).columns
]

if len(final_columns) == 0:
final_columns = new_chunk
else:
for col_idx in range(len(meta_names)):
final_columns[col_idx] = concatenate(
[final_columns[col_idx], new_chunk[col_idx]]
)
# Must drop any residual GPU columns to save memory
new_chunk[col_idx] = None
i += 1
return (final_columns, meta_names, child_names)


cpdef TableWithMetadata read_json(
SourceInfo source_info,
list dtypes = None,
Expand All @@ -76,14 +215,17 @@ cpdef TableWithMetadata read_json(
the list of child dtypes is an empty list if the child is not
a nested type (list or struct dtype), and is of format
(column_child_name, column_child_type, list of grandchild dtypes).
compression_type: CompressionType, default CompressionType.AUTO
compression: CompressionType, default CompressionType.AUTO
The compression format of the JSON source.
byte_range_offset : size_type, default 0
Number of bytes to skip from source start.
byte_range_size : size_type, default 0
Number of bytes to read. By default, will read all bytes.
keep_quotes : bool, default False
Whether the reader should keep quotes of string values.
mixed_types_as_string : bool, default False
If True, mixed type columns are returned as string columns.
If `False` parsing mixed type columns will thrown an error.
prune_columns : bool, default False
Whether to only read columns specified in dtypes.
recover_mode : JSONRecoveryMode, default JSONRecoveryMode.FAIL
Expand All @@ -95,29 +237,19 @@ cpdef TableWithMetadata read_json(
TableWithMetadata
The Table and its corresponding metadata (column names) that were read in.
"""
cdef vector[data_type] types_vec
cdef json_reader_options opts = move(
json_reader_options.builder(source_info.c_obj)
.compression(compression)
.lines(lines)
.byte_range_offset(byte_range_offset)
.byte_range_size(byte_range_size)
.recovery_mode(recovery_mode)
.build()
cdef json_reader_options opts = _setup_json_reader_options(
source_info=source_info,
dtypes=dtypes,
compression=compression,
lines=lines,
byte_range_offset=byte_range_offset,
byte_range_size=byte_range_size,
keep_quotes=keep_quotes,
mixed_types_as_string=mixed_types_as_string,
prune_columns=prune_columns,
recovery_mode=recovery_mode,
)

if dtypes is not None:
if isinstance(dtypes[0], tuple):
opts.set_dtypes(move(_generate_schema_map(dtypes)))
else:
for dtype in dtypes:
types_vec.push_back((<DataType>dtype).c_obj)
opts.set_dtypes(types_vec)

opts.enable_keep_quotes(keep_quotes)
opts.enable_mixed_types_as_string(mixed_types_as_string)
opts.enable_prune_columns(prune_columns)

# Read JSON
cdef table_with_metadata c_result

Expand Down
1 change: 1 addition & 0 deletions python/cudf/cudf/_lib/utils.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ cdef table_view table_view_from_table(tbl, ignore_index=*) except*
cdef columns_from_unique_ptr(unique_ptr[table] c_tbl)
cdef columns_from_table_view(table_view tv, object owners)
cdef columns_from_pylibcudf_table(tbl)
cdef _data_from_columns(columns, column_names, index_names=*)
2 changes: 1 addition & 1 deletion python/cudf/cudf/tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -1191,7 +1191,7 @@ def test_csv_reader_byte_range_type_corner_case(tmpdir):
).to_csv(fname, chunksize=100000)

byte_range = (2_147_483_648, 0)
with pytest.raises(RuntimeError, match="Offset is past end of file"):
with pytest.raises(OverflowError, match="Offset is past end of file"):
cudf.read_csv(fname, byte_range=byte_range, header=None)


Expand Down
16 changes: 16 additions & 0 deletions python/cudf/cudf/tests/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -1428,3 +1428,19 @@ def test_json_reader_on_bad_lines(on_bad_lines):
orient="records",
on_bad_lines=on_bad_lines,
)


def test_chunked_json_reader():
df = cudf.DataFrame(
{
"a": ["aaaa"] * 9_00_00_00,
"b": list(range(0, 9_00_00_00)),
}
)
buf = BytesIO()
df.to_json(buf, lines=True, orient="records", engine="cudf")
buf.seek(0)
df = df.to_pandas()
with cudf.option_context("mode.pandas_compatible", True):
gdf = cudf.read_json(buf, lines=True)
assert_eq(df, gdf)
Loading