Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement more copying APIs in pylibcudf #14508

Merged
merged 21 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/include/cudf/copying.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ std::unique_ptr<table> scatter(
/**
* @brief Indicates when to allocate a mask, based on an existing mask.
*/
enum class mask_allocation_policy {
enum class mask_allocation_policy : int32_t {
vyasr marked this conversation as resolved.
Show resolved Hide resolved
NEVER, ///< Do not allocate a null mask, regardless of input
RETAIN, ///< Allocate a null mask if the input contains one
ALWAYS ///< Allocate a null mask, regardless of input
Expand Down
255 changes: 59 additions & 196 deletions python/cudf/cudf/_lib/copying.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pickle

from libc.stdint cimport int32_t, uint8_t, uintptr_t
from libc.stdint cimport uint8_t, uintptr_t
from libcpp cimport bool
from libcpp.memory cimport make_shared, shared_ptr, unique_ptr
from libcpp.utility cimport move
Expand All @@ -24,7 +24,6 @@ from cudf._lib.utils cimport table_view_from_columns, table_view_from_table
from cudf._lib.reduce import minmax
from cudf.core.abc import Serializable

from libcpp.functional cimport reference_wrapper
from libcpp.memory cimport make_unique

cimport cudf._lib.cpp.contiguous_split as cpp_contiguous_split
Expand All @@ -36,13 +35,11 @@ from cudf._lib.cpp.lists.gather cimport (
)
from cudf._lib.cpp.lists.lists_column_view cimport lists_column_view
from cudf._lib.cpp.scalar.scalar cimport scalar
from cudf._lib.cpp.table.table cimport table
from cudf._lib.cpp.table.table_view cimport table_view
from cudf._lib.cpp.types cimport size_type
from cudf._lib.utils cimport (
columns_from_pylibcudf_table,
columns_from_table_view,
columns_from_unique_ptr,
data_from_table_view,
table_view_from_columns,
)
Expand Down Expand Up @@ -116,25 +113,15 @@ def _copy_range(Column input_column,
size_type input_begin,
size_type input_end,
size_type target_begin):

cdef column_view input_column_view = input_column.view()
cdef column_view target_column_view = target_column.view()
cdef size_type c_input_begin = input_begin
cdef size_type c_input_end = input_end
cdef size_type c_target_begin = target_begin

cdef unique_ptr[column] c_result

with nogil:
c_result = move(cpp_copying.copy_range(
input_column_view,
target_column_view,
c_input_begin,
c_input_end,
c_target_begin)
return Column.from_pylibcudf(
pylibcudf.copying.copy_range(
input_column.to_pylibcudf(mode="read"),
target_column.to_pylibcudf(mode="read"),
input_begin,
input_end,
target_begin
)

return Column.from_unique_ptr(move(c_result))
)


@acquire_spill_lock()
Expand Down Expand Up @@ -184,48 +171,6 @@ def gather(
return columns_from_pylibcudf_table(tbl)


cdef scatter_scalar(list source_device_slrs,
column_view scatter_map,
table_view target_table):
cdef vector[reference_wrapper[constscalar]] c_source
cdef DeviceScalar d_slr
cdef unique_ptr[table] c_result

c_source.reserve(len(source_device_slrs))
for d_slr in source_device_slrs:
c_source.push_back(
reference_wrapper[constscalar](d_slr.get_raw_ptr()[0])
)

with nogil:
c_result = move(
cpp_copying.scatter(
c_source,
scatter_map,
target_table,
)
)

return columns_from_unique_ptr(move(c_result))


cdef scatter_column(list source_columns,
column_view scatter_map,
table_view target_table):
cdef table_view c_source = table_view_from_columns(source_columns)
cdef unique_ptr[table] c_result

with nogil:
c_result = move(
cpp_copying.scatter(
c_source,
scatter_map,
target_table,
)
)
return columns_from_unique_ptr(move(c_result))


@acquire_spill_lock()
def scatter(list sources, Column scatter_map, list target_columns,
bool bounds_check=True):
Expand All @@ -243,9 +188,6 @@ def scatter(list sources, Column scatter_map, list target_columns,
if len(sources) == 0:
return []

cdef column_view scatter_map_view = scatter_map.view()
cdef table_view target_table_view = table_view_from_columns(target_columns)

if bounds_check:
n_rows = len(target_columns[0])
if not (
Expand All @@ -257,62 +199,47 @@ def scatter(list sources, Column scatter_map, list target_columns,
)

if isinstance(sources[0], Column):
return scatter_column(
sources, scatter_map_view, target_table_view
tbl = pylibcudf.copying.scatter_table(
pylibcudf.Table([col.to_pylibcudf(mode="read") for col in sources]),
scatter_map.to_pylibcudf(mode="read"),
pylibcudf.Table([col.to_pylibcudf(mode="read") for col in target_columns]),
)
else:
source_scalars = [as_device_scalar(slr) for slr in sources]
return scatter_scalar(
source_scalars, scatter_map_view, target_table_view
tbl = pylibcudf.copying.scatter_scalars(
[(<DeviceScalar> as_device_scalar(slr)).c_value for slr in sources],
scatter_map.to_pylibcudf(mode="read"),
pylibcudf.Table([col.to_pylibcudf(mode="read") for col in target_columns]),
)

return columns_from_pylibcudf_table(tbl)


@acquire_spill_lock()
def column_empty_like(Column input_column):

cdef column_view input_column_view = input_column.view()
cdef unique_ptr[column] c_result

with nogil:
c_result = move(cpp_copying.empty_like(input_column_view))

return Column.from_unique_ptr(move(c_result))
return Column.from_pylibcudf(
pylibcudf.copying.empty_column_like(
input_column.to_pylibcudf(mode="read")
)
)


@acquire_spill_lock()
def column_allocate_like(Column input_column, size=None):

cdef size_type c_size = 0
cdef column_view input_column_view = input_column.view()
cdef unique_ptr[column] c_result

if size is None:
with nogil:
c_result = move(cpp_copying.allocate_like(
input_column_view,
cpp_copying.mask_allocation_policy.RETAIN)
)
else:
c_size = size
with nogil:
c_result = move(cpp_copying.allocate_like(
input_column_view,
c_size,
cpp_copying.mask_allocation_policy.RETAIN)
)

return Column.from_unique_ptr(move(c_result))
return Column.from_pylibcudf(
pylibcudf.copying.allocate_like(
input_column.to_pylibcudf(mode="read"),
size,
)
)


@acquire_spill_lock()
def columns_empty_like(list input_columns):
cdef table_view input_table_view = table_view_from_columns(input_columns)
cdef unique_ptr[table] c_result

with nogil:
c_result = move(cpp_copying.empty_like(input_table_view))

return columns_from_unique_ptr(move(c_result))
return columns_from_pylibcudf_table(
pylibcudf.copying.empty_table_like(
pylibcudf.Table([col.to_pylibcudf(mode="read") for col in input_columns])
)
)


@acquire_spill_lock()
Expand Down Expand Up @@ -513,70 +440,15 @@ def _copy_if_else_scalar_scalar(DeviceScalar lhs,

@acquire_spill_lock()
def copy_if_else(object lhs, object rhs, Column boolean_mask):

if isinstance(lhs, Column):
if isinstance(rhs, Column):
return _copy_if_else_column_column(lhs, rhs, boolean_mask)
else:
return _copy_if_else_column_scalar(
lhs, as_device_scalar(rhs), boolean_mask)
else:
if isinstance(rhs, Column):
return _copy_if_else_scalar_column(
as_device_scalar(lhs), rhs, boolean_mask)
else:
if lhs is None and rhs is None:
return lhs

return _copy_if_else_scalar_scalar(
as_device_scalar(lhs), as_device_scalar(rhs), boolean_mask)


def _boolean_mask_scatter_columns(list input_columns, list target_columns,
Column boolean_mask):

cdef table_view input_table_view = table_view_from_columns(input_columns)
cdef table_view target_table_view = table_view_from_columns(target_columns)
cdef column_view boolean_mask_view = boolean_mask.view()

cdef unique_ptr[table] c_result

with nogil:
c_result = move(
cpp_copying.boolean_mask_scatter(
input_table_view,
target_table_view,
boolean_mask_view
)
)

return columns_from_unique_ptr(move(c_result))


def _boolean_mask_scatter_scalar(list input_scalars, list target_columns,
Column boolean_mask):

cdef vector[reference_wrapper[constscalar]] input_scalar_vector
input_scalar_vector.reserve(len(input_scalars))
cdef DeviceScalar scl
for scl in input_scalars:
input_scalar_vector.push_back(reference_wrapper[constscalar](
scl.get_raw_ptr()[0]))
cdef table_view target_table_view = table_view_from_columns(target_columns)
cdef column_view boolean_mask_view = boolean_mask.view()

cdef unique_ptr[table] c_result

with nogil:
c_result = move(
cpp_copying.boolean_mask_scatter(
input_scalar_vector,
target_table_view,
boolean_mask_view
)
return Column.from_pylibcudf(
pylibcudf.copying.copy_if_else(
lhs.to_pylibcudf(mode="read") if isinstance(lhs, Column)
else (<DeviceScalar> as_device_scalar(lhs)).c_value,
rhs.to_pylibcudf(mode="read") if isinstance(rhs, Column)
else (<DeviceScalar> as_device_scalar(rhs)).c_value,
boolean_mask.to_pylibcudf(mode="read"),
)

return columns_from_unique_ptr(move(c_result))
)


@acquire_spill_lock()
Expand All @@ -598,45 +470,36 @@ def boolean_mask_scatter(list input_, list target_columns,
return []

if isinstance(input_[0], Column):
return _boolean_mask_scatter_columns(
input_,
target_columns,
boolean_mask
tbl = pylibcudf.copying.boolean_mask_table_scatter(
pylibcudf.Table([col.to_pylibcudf(mode="read") for col in input_]),
pylibcudf.Table([col.to_pylibcudf(mode="read") for col in target_columns]),
boolean_mask.to_pylibcudf(mode="read"),
)
else:
scalar_list = [as_device_scalar(i) for i in input_]
return _boolean_mask_scatter_scalar(
scalar_list,
target_columns,
boolean_mask
tbl = pylibcudf.copying.boolean_mask_scalars_scatter(
[(<DeviceScalar> as_device_scalar(i)).c_value for i in input_],
pylibcudf.Table([col.to_pylibcudf(mode="read") for col in target_columns]),
boolean_mask.to_pylibcudf(mode="read"),
)

return columns_from_pylibcudf_table(tbl)


@acquire_spill_lock()
def shift(Column input, int offset, object fill_value=None):

cdef DeviceScalar fill

if isinstance(fill_value, DeviceScalar):
fill = fill_value
else:
fill = as_device_scalar(fill_value, input.dtype)

cdef column_view c_input = input.view()
cdef int32_t c_offset = offset
cdef const scalar* c_fill_value = fill.get_raw_ptr()
cdef unique_ptr[column] c_output

with nogil:
c_output = move(
cpp_copying.shift(
c_input,
c_offset,
c_fill_value[0]
)
)

return Column.from_unique_ptr(move(c_output))
col = pylibcudf.copying.shift(
input.to_pylibcudf(mode="read"),
offset,
fill.c_value,
)
return Column.from_pylibcudf(col)


@acquire_spill_lock()
Expand Down
Loading
Loading