Skip to content

Commit

Permalink
Add low memory JSON reader for cudf.pandas (#16204)
Browse files Browse the repository at this point in the history
Fixes: #16122 

This PR introduces low-memory JSON reading for `cudf.pandas` `read_json`.

Authors:
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - Matthew Roeschke (https://github.com/mroeschke)
  - Muhammad Haseeb (https://github.com/mhaseeb123)
  - Shruti Shivakumar (https://github.com/shrshi)

URL: #16204
  • Loading branch information
galipremsagar authored Jul 12, 2024
1 parent 390e6fe commit 954ce6d
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 46 deletions.
3 changes: 2 additions & 1 deletion cpp/src/io/json/read_json.cu
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ datasource::owning_buffer<rmm::device_uvector<char>> get_record_range_raw_input(
size_t chunk_size = reader_opts.get_byte_range_size();

CUDF_EXPECTS(total_source_size ? chunk_offset < total_source_size : !chunk_offset,
"Invalid offsetting");
"Invalid offsetting",
std::invalid_argument);
auto should_load_all_sources = !chunk_size || chunk_size >= total_source_size - chunk_offset;
chunk_size = should_load_all_sources ? total_source_size - chunk_offset : chunk_size;

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/utilities/datasource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ class memory_mapped_source : public file_source {

void map(int fd, size_t offset, size_t size)
{
CUDF_EXPECTS(offset < _file.size(), "Offset is past end of file");
CUDF_EXPECTS(offset < _file.size(), "Offset is past end of file", std::overflow_error);

// Offset for `mmap()` must be page aligned
_map_offset = offset & ~(sysconf(_SC_PAGESIZE) - 1);
Expand Down
63 changes: 42 additions & 21 deletions python/cudf/cudf/_lib/json.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ from cudf.core.buffer import acquire_spill_lock
from libcpp cimport bool

cimport cudf._lib.pylibcudf.libcudf.io.types as cudf_io_types
from cudf._lib.column cimport Column
from cudf._lib.io.utils cimport add_df_col_struct_names
from cudf._lib.pylibcudf.io.types cimport compression_type
from cudf._lib.pylibcudf.libcudf.io.json cimport json_recovery_mode_t
from cudf._lib.pylibcudf.libcudf.io.types cimport compression_type
from cudf._lib.pylibcudf.libcudf.types cimport data_type, type_id
from cudf._lib.pylibcudf.types cimport DataType
from cudf._lib.types cimport dtype_to_data_type
from cudf._lib.utils cimport data_from_pylibcudf_io
from cudf._lib.utils cimport _data_from_columns, data_from_pylibcudf_io

import cudf._lib.pylibcudf as plc

Expand Down Expand Up @@ -98,28 +99,48 @@ cpdef read_json(object filepaths_or_buffers,
else:
raise TypeError("`dtype` must be 'list like' or 'dict'")

table_w_meta = plc.io.json.read_json(
plc.io.SourceInfo(filepaths_or_buffers),
processed_dtypes,
c_compression,
lines,
byte_range_offset = byte_range[0] if byte_range is not None else 0,
byte_range_size = byte_range[1] if byte_range is not None else 0,
keep_quotes = keep_quotes,
mixed_types_as_string = mixed_types_as_string,
prune_columns = prune_columns,
recovery_mode = _get_json_recovery_mode(on_bad_lines)
)

df = cudf.DataFrame._from_data(
*data_from_pylibcudf_io(
table_w_meta
if cudf.get_option("mode.pandas_compatible") and lines:
res_cols, res_col_names, res_child_names = plc.io.json.chunked_read_json(
plc.io.SourceInfo(filepaths_or_buffers),
processed_dtypes,
c_compression,
keep_quotes = keep_quotes,
mixed_types_as_string = mixed_types_as_string,
prune_columns = prune_columns,
recovery_mode = _get_json_recovery_mode(on_bad_lines)
)
df = cudf.DataFrame._from_data(
*_data_from_columns(
columns=[Column.from_pylibcudf(plc) for plc in res_cols],
column_names=res_col_names,
index_names=None
)
)
add_df_col_struct_names(df, res_child_names)
return df
else:
table_w_meta = plc.io.json.read_json(
plc.io.SourceInfo(filepaths_or_buffers),
processed_dtypes,
c_compression,
lines,
byte_range_offset = byte_range[0] if byte_range is not None else 0,
byte_range_size = byte_range[1] if byte_range is not None else 0,
keep_quotes = keep_quotes,
mixed_types_as_string = mixed_types_as_string,
prune_columns = prune_columns,
recovery_mode = _get_json_recovery_mode(on_bad_lines)
)

df = cudf.DataFrame._from_data(
*data_from_pylibcudf_io(
table_w_meta
)
)
)

# Post-processing to add in struct column names
add_df_col_struct_names(df, table_w_meta.child_names)
return df
# Post-processing to add in struct column names
add_df_col_struct_names(df, table_w_meta.child_names)
return df


@acquire_spill_lock()
Expand Down
11 changes: 11 additions & 0 deletions python/cudf/cudf/_lib/pylibcudf/io/json.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,14 @@ cpdef void write_json(
str true_value = *,
str false_value = *
)

cpdef tuple chunked_read_json(
SourceInfo source_info,
list dtypes = *,
compression_type compression = *,
bool keep_quotes = *,
bool mixed_types_as_string = *,
bool prune_columns = *,
json_recovery_mode_t recovery_mode = *,
int chunk_size= *,
)
176 changes: 154 additions & 22 deletions python/cudf/cudf/_lib/pylibcudf/io/json.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ from libcpp.string cimport string
from libcpp.utility cimport move
from libcpp.vector cimport vector

from cudf._lib.pylibcudf.concatenate cimport concatenate
from cudf._lib.pylibcudf.io.types cimport (
SinkInfo,
SourceInfo,
Expand Down Expand Up @@ -50,6 +51,144 @@ cdef map[string, schema_element] _generate_schema_map(list dtypes):
return schema_map


cdef json_reader_options _setup_json_reader_options(
SourceInfo source_info,
list dtypes,
compression_type compression,
bool lines,
size_type byte_range_offset,
size_type byte_range_size,
bool keep_quotes,
bool mixed_types_as_string,
bool prune_columns,
json_recovery_mode_t recovery_mode):

cdef vector[data_type] types_vec
cdef json_reader_options opts = move(
json_reader_options.builder(source_info.c_obj)
.compression(compression)
.lines(lines)
.byte_range_offset(byte_range_offset)
.byte_range_size(byte_range_size)
.recovery_mode(recovery_mode)
.build()
)

if dtypes is not None:
if isinstance(dtypes[0], tuple):
opts.set_dtypes(move(_generate_schema_map(dtypes)))
else:
for dtype in dtypes:
types_vec.push_back((<DataType>dtype).c_obj)
opts.set_dtypes(types_vec)

opts.enable_keep_quotes(keep_quotes)
opts.enable_mixed_types_as_string(mixed_types_as_string)
opts.enable_prune_columns(prune_columns)
return opts


cpdef tuple chunked_read_json(
SourceInfo source_info,
list dtypes = None,
compression_type compression = compression_type.AUTO,
bool keep_quotes = False,
bool mixed_types_as_string = False,
bool prune_columns = False,
json_recovery_mode_t recovery_mode = json_recovery_mode_t.FAIL,
int chunk_size=100_000_000,
):
"""Reads an JSON file into a :py:class:`~.types.TableWithMetadata`.
Parameters
----------
source_info : SourceInfo
The SourceInfo object to read the JSON file from.
dtypes : list, default None
Set data types for the columns in the JSON file.
Each element of the list has the format
(column_name, column_dtype, list of child dtypes), where
the list of child dtypes is an empty list if the child is not
a nested type (list or struct dtype), and is of format
(column_child_name, column_child_type, list of grandchild dtypes).
compression: CompressionType, default CompressionType.AUTO
The compression format of the JSON source.
keep_quotes : bool, default False
Whether the reader should keep quotes of string values.
mixed_types_as_string : bool, default False
If True, mixed type columns are returned as string columns.
If `False` parsing mixed type columns will thrown an error.
prune_columns : bool, default False
Whether to only read columns specified in dtypes.
recover_mode : JSONRecoveryMode, default JSONRecoveryMode.FAIL
Whether to raise an error or set corresponding values to null
when encountering an invalid JSON line.
chunk_size : int, default 100_000_000 bytes.
The number of bytes to be read in chunks.
The chunk_size should be set to at least row_size.
Returns
-------
tuple
A tuple of (columns, column_name, child_names)
"""
cdef size_type c_range_size = (
chunk_size if chunk_size is not None else 0
)
cdef json_reader_options opts = _setup_json_reader_options(
source_info=source_info,
dtypes=dtypes,
compression=compression,
lines=True,
byte_range_offset=0,
byte_range_size=0,
keep_quotes=keep_quotes,
mixed_types_as_string=mixed_types_as_string,
prune_columns=prune_columns,
recovery_mode=recovery_mode,
)

# Read JSON
cdef table_with_metadata c_result

final_columns = []
meta_names = None
child_names = None
i = 0
while True:
opts.set_byte_range_offset(c_range_size * i)
opts.set_byte_range_size(c_range_size)

try:
with nogil:
c_result = move(cpp_read_json(opts))
except (ValueError, OverflowError):
break
if meta_names is None:
meta_names = [info.name.decode() for info in c_result.metadata.schema_info]
if child_names is None:
child_names = TableWithMetadata._parse_col_names(
c_result.metadata.schema_info
)
new_chunk = [
col for col in TableWithMetadata.from_libcudf(
c_result).columns
]

if len(final_columns) == 0:
final_columns = new_chunk
else:
for col_idx in range(len(meta_names)):
final_columns[col_idx] = concatenate(
[final_columns[col_idx], new_chunk[col_idx]]
)
# Must drop any residual GPU columns to save memory
new_chunk[col_idx] = None
i += 1
return (final_columns, meta_names, child_names)


cpdef TableWithMetadata read_json(
SourceInfo source_info,
list dtypes = None,
Expand All @@ -76,14 +215,17 @@ cpdef TableWithMetadata read_json(
the list of child dtypes is an empty list if the child is not
a nested type (list or struct dtype), and is of format
(column_child_name, column_child_type, list of grandchild dtypes).
compression_type: CompressionType, default CompressionType.AUTO
compression: CompressionType, default CompressionType.AUTO
The compression format of the JSON source.
byte_range_offset : size_type, default 0
Number of bytes to skip from source start.
byte_range_size : size_type, default 0
Number of bytes to read. By default, will read all bytes.
keep_quotes : bool, default False
Whether the reader should keep quotes of string values.
mixed_types_as_string : bool, default False
If True, mixed type columns are returned as string columns.
If `False` parsing mixed type columns will thrown an error.
prune_columns : bool, default False
Whether to only read columns specified in dtypes.
recover_mode : JSONRecoveryMode, default JSONRecoveryMode.FAIL
Expand All @@ -95,29 +237,19 @@ cpdef TableWithMetadata read_json(
TableWithMetadata
The Table and its corresponding metadata (column names) that were read in.
"""
cdef vector[data_type] types_vec
cdef json_reader_options opts = move(
json_reader_options.builder(source_info.c_obj)
.compression(compression)
.lines(lines)
.byte_range_offset(byte_range_offset)
.byte_range_size(byte_range_size)
.recovery_mode(recovery_mode)
.build()
cdef json_reader_options opts = _setup_json_reader_options(
source_info=source_info,
dtypes=dtypes,
compression=compression,
lines=lines,
byte_range_offset=byte_range_offset,
byte_range_size=byte_range_size,
keep_quotes=keep_quotes,
mixed_types_as_string=mixed_types_as_string,
prune_columns=prune_columns,
recovery_mode=recovery_mode,
)

if dtypes is not None:
if isinstance(dtypes[0], tuple):
opts.set_dtypes(move(_generate_schema_map(dtypes)))
else:
for dtype in dtypes:
types_vec.push_back((<DataType>dtype).c_obj)
opts.set_dtypes(types_vec)

opts.enable_keep_quotes(keep_quotes)
opts.enable_mixed_types_as_string(mixed_types_as_string)
opts.enable_prune_columns(prune_columns)

# Read JSON
cdef table_with_metadata c_result

Expand Down
1 change: 1 addition & 0 deletions python/cudf/cudf/_lib/utils.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ cdef table_view table_view_from_table(tbl, ignore_index=*) except*
cdef columns_from_unique_ptr(unique_ptr[table] c_tbl)
cdef columns_from_table_view(table_view tv, object owners)
cdef columns_from_pylibcudf_table(tbl)
cdef _data_from_columns(columns, column_names, index_names=*)
2 changes: 1 addition & 1 deletion python/cudf/cudf/tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -1191,7 +1191,7 @@ def test_csv_reader_byte_range_type_corner_case(tmpdir):
).to_csv(fname, chunksize=100000)

byte_range = (2_147_483_648, 0)
with pytest.raises(RuntimeError, match="Offset is past end of file"):
with pytest.raises(OverflowError, match="Offset is past end of file"):
cudf.read_csv(fname, byte_range=byte_range, header=None)


Expand Down
16 changes: 16 additions & 0 deletions python/cudf/cudf/tests/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -1428,3 +1428,19 @@ def test_json_reader_on_bad_lines(on_bad_lines):
orient="records",
on_bad_lines=on_bad_lines,
)


def test_chunked_json_reader():
df = cudf.DataFrame(
{
"a": ["aaaa"] * 9_00_00_00,
"b": list(range(0, 9_00_00_00)),
}
)
buf = BytesIO()
df.to_json(buf, lines=True, orient="records", engine="cudf")
buf.seek(0)
df = df.to_pandas()
with cudf.option_context("mode.pandas_compatible", True):
gdf = cudf.read_json(buf, lines=True)
assert_eq(df, gdf)

0 comments on commit 954ce6d

Please sign in to comment.