Skip to content

Commit

Permalink
Implement mixed equality/conditional joins (#9917)
Browse files Browse the repository at this point in the history
This PR implements mixed equality/inequality joins for inner, left, and full joins. This resolves #9696 and contributes to #5401. For the moment, all APIs are functional only, but an object-oriented API is planned to support caching of the hash table.

Authors:
  - Vyas Ramasubramani (https://github.com/vyasr)

Approvers:
  - Robert Maynard (https://github.com/robertmaynard)
  - Yunsong Wang (https://github.com/PointKernel)
  - Jason Lowe (https://github.com/jlowe)
  - Jake Hemstad (https://github.com/jrhemstad)

URL: #9917
  • Loading branch information
vyasr authored Jan 18, 2022
1 parent 7ff5f12 commit e4a16ae
Show file tree
Hide file tree
Showing 15 changed files with 1,884 additions and 66 deletions.
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ add_library(
src/jit/parser.cpp
src/jit/type.cpp
src/join/conditional_join.cu
src/join/mixed_join.cu
src/join/cross_join.cu
src/join/hash_join.cu
src/join/join.cu
Expand Down
4 changes: 2 additions & 2 deletions cpp/include/cudf/ast/detail/expression_evaluator.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ struct expression_evaluator {
__device__ __forceinline__ void evaluate(
expression_result<ResultSubclass, T, result_has_nulls>& output_object,
cudf::size_type const row_index,
IntermediateDataType<has_nulls>* thread_intermediate_storage)
IntermediateDataType<has_nulls>* thread_intermediate_storage) const
{
evaluate(output_object, row_index, row_index, row_index, thread_intermediate_storage);
}
Expand All @@ -452,7 +452,7 @@ struct expression_evaluator {
cudf::size_type const left_row_index,
cudf::size_type const right_row_index,
cudf::size_type const output_row_index,
IntermediateDataType<has_nulls>* thread_intermediate_storage)
IntermediateDataType<has_nulls>* thread_intermediate_storage) const
{
cudf::size_type operator_source_index{0};
for (cudf::size_type operator_index = 0; operator_index < plan.operators.size();
Expand Down
267 changes: 266 additions & 1 deletion cpp/include/cudf/join.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <cudf/ast/expressions.hpp>
#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_uvector.hpp>
Expand Down Expand Up @@ -701,7 +702,7 @@ conditional_inner_join(
* The first returned vector contains all the row indices from the left
* table (in unspecified order). The corresponding value in the
* second returned vector is either (1) the row index of the matched row
* from the right table, if there is a match or (2) an unspecified
* from the right table, if there is a match or (2) an unspecified
* out-of-bounds value.
*
* If the provided predicate returns NULL for a pair of rows
Expand Down Expand Up @@ -858,6 +859,270 @@ std::unique_ptr<rmm::device_uvector<size_type>> conditional_left_anti_join(
std::optional<std::size_t> output_size = {},
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Returns a pair of row index vectors corresponding to all pairs of
* rows between the specified tables where the columns of the equality table
* are equal and the predicate evaluates to true on the conditional tables.
*
* The first returned vector contains the row indices from the left
* table that have a match in the right table (in unspecified order).
* The corresponding values in the second returned vector are
* the matched row indices from the right table.
*
* If the provided predicate returns NULL for a pair of rows
* (left, right), that pair is not included in the output. It is the user's
* responsiblity to choose a suitable compare_nulls value AND use appropriate
* null-safe operators in the expression.
*
* If the provided output size or per-row counts are incorrect, behavior is undefined.
*
* @code{.pseudo}
* left_equality: {{0, 1, 2}}
* right_equality: {{1, 2, 3}}
* left_conditional: {{4, 4, 4}}
* right_conditional: {{3, 4, 5}}
* Expression: Left.Column_0 > Right.Column_0
* Result: {{1}, {0}}
* @endcode
*
* @throw cudf::logic_error If the binary predicate outputs a non-boolean result.
* @throw cudf::logic_error If the number of rows in left_equality and left_conditional do not
* match.
* @throw cudf::logic_error If the number of rows in right_equality and right_conditional do not
* match.
*
* @param left_equality The left table used for the equality join.
* @param right_equality The right table used for the equality join.
* @param left_conditional The left table used for the conditional join.
* @param right_conditional The right table used for the conditional join.
* @param binary_predicate The condition on which to join.
* @param compare_nulls Whether or not null values join to each other or not.
* @param output_size_data An optional pair of values indicating the exact output size and the
* number of matches for each row in the larger of the two input tables, left or right (may be
* precomputed using the corresponding mixed_inner_join_size API).
* @param mr Device memory resource used to allocate the returned table and columns' device memory
*
* @return A pair of vectors [`left_indices`, `right_indices`] that can be used to construct
* the result of performing a mixed inner join between the four input tables.
*/
std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
mixed_inner_join(
table_view const& left_equality,
table_view const& right_equality,
table_view const& left_conditional,
table_view const& right_conditional,
ast::expression const& binary_predicate,
null_equality compare_nulls = null_equality::EQUAL,
std::optional<std::pair<std::size_t, device_span<size_type const>>> output_size_data = {},
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Returns a pair of row index vectors corresponding to all pairs of
* rows between the specified tables where the columns of the equality table
* are equal and the predicate evaluates to true on the conditional tables,
* or null matches for rows in left that have no match in right.
*
* The first returned vector contains the row indices from the left
* tables that have a match in the right tables (in unspecified order).
* The corresponding value in the second returned vector is either (1)
* the row index of the matched row from the right tables, or (2) an
* unspecified out-of-bounds value.
*
* If the provided predicate returns NULL for a pair of rows
* (left, right), that pair is not included in the output. It is the user's
* responsiblity to choose a suitable compare_nulls value AND use appropriate
* null-safe operators in the expression.
*
* If the provided output size or per-row counts are incorrect, behavior is undefined.
*
* @code{.pseudo}
* left_equality: {{0, 1, 2}}
* right_equality: {{1, 2, 3}}
* left_conditional: {{4, 4, 4}}
* right_conditional: {{3, 4, 5}}
* Expression: Left.Column_0 > Right.Column_0
* Result: {{0, 1, 2}, {None, 0, None}}
* @endcode
*
* @throw cudf::logic_error If the binary predicate outputs a non-boolean result.
* @throw cudf::logic_error If the number of rows in left_equality and left_conditional do not
* match.
* @throw cudf::logic_error If the number of rows in right_equality and right_conditional do not
* match.
*
* @param left_equality The left table used for the equality join.
* @param right_equality The right table used for the equality join.
* @param left_conditional The left table used for the conditional join.
* @param right_conditional The right table used for the conditional join.
* @param binary_predicate The condition on which to join.
* @param compare_nulls Whether or not null values join to each other or not.
* @param output_size_data An optional pair of values indicating the exact output size and the
* number of matches for each row in the larger of the two input tables, left or right (may be
* precomputed using the corresponding mixed_left_join_size API).
* @param mr Device memory resource used to allocate the returned table and columns' device memory
*
* @return A pair of vectors [`left_indices`, `right_indices`] that can be used to construct
* the result of performing a mixed left join between the four input tables.
*/
std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
mixed_left_join(
table_view const& left_equality,
table_view const& right_equality,
table_view const& left_conditional,
table_view const& right_conditional,
ast::expression const& binary_predicate,
null_equality compare_nulls = null_equality::EQUAL,
std::optional<std::pair<std::size_t, device_span<size_type const>>> output_size_data = {},
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Returns a pair of row index vectors corresponding to all pairs of
* rows between the specified tables where the columns of the equality table
* are equal and the predicate evaluates to true on the conditional tables,
* or null matches for rows in either pair of tables that have no matches in
* the other pair.
*
* Taken pairwise, the values from the returned vectors are one of:
* (1) row indices corresponding to matching rows from the left and
* right tables, (2) a row index and an unspecified out-of-bounds value,
* representing a row from one table without a match in the other.
*
* If the provided predicate returns NULL for a pair of rows
* (left, right), that pair is not included in the output. It is the user's
* responsiblity to choose a suitable compare_nulls value AND use appropriate
* null-safe operators in the expression.
*
* If the provided output size or per-row counts are incorrect, behavior is undefined.
*
* @code{.pseudo}
* left_equality: {{0, 1, 2}}
* right_equality: {{1, 2, 3}}
* left_conditional: {{4, 4, 4}}
* right_conditional: {{3, 4, 5}}
* Expression: Left.Column_0 > Right.Column_0
* Result: {{0, 1, 2, None, None}, {None, 0, None, 1, 2}}
* @endcode
*
* @throw cudf::logic_error If the binary predicate outputs a non-boolean result.
* @throw cudf::logic_error If the number of rows in left_equality and left_conditional do not
* match.
* @throw cudf::logic_error If the number of rows in right_equality and right_conditional do not
* match.
*
* @param left_equality The left table used for the equality join.
* @param right_equality The right table used for the equality join.
* @param left_conditional The left table used for the conditional join.
* @param right_conditional The right table used for the conditional join.
* @param binary_predicate The condition on which to join.
* @param compare_nulls Whether or not null values join to each other or not.
* @param output_size_data An optional pair of values indicating the exact output size and the
* number of matches for each row in the larger of the two input tables, left or right (may be
* precomputed using the corresponding mixed_full_join_size API).
* @param mr Device memory resource used to allocate the returned table and columns' device memory
*
* @return A pair of vectors [`left_indices`, `right_indices`] that can be used to construct
* the result of performing a mixed full join between the four input tables.
*/
std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
mixed_full_join(
table_view const& left_equality,
table_view const& right_equality,
table_view const& left_conditional,
table_view const& right_conditional,
ast::expression const& binary_predicate,
null_equality compare_nulls = null_equality::EQUAL,
std::optional<std::pair<std::size_t, device_span<size_type const>>> output_size_data = {},
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Returns the exact number of matches (rows) when performing a
* mixed inner join between the specified tables where the columns of the
* equality table are equal and the predicate evaluates to true on the
* conditional tables.
*
* If the provided predicate returns NULL for a pair of rows (left, right),
* that pair is not included in the output. It is the user's responsiblity to
* choose a suitable compare_nulls value AND use appropriate null-safe
* operators in the expression.
*
* @throw cudf::logic_error If the binary predicate outputs a non-boolean result.
* @throw cudf::logic_error If the number of rows in left_equality and left_conditional do not
* match.
* @throw cudf::logic_error If the number of rows in right_equality and right_conditional do not
* match.
*
* @param left_equality The left table used for the equality join.
* @param right_equality The right table used for the equality join.
* @param left_conditional The left table used for the conditional join.
* @param right_conditional The right table used for the conditional join.
* @param binary_predicate The condition on which to join.
* @param compare_nulls Whether or not null values join to each other or not.
* @param output_size An optional pair of values indicating the exact output size and the number of
* matches for each row in the larger of the two input tables, left or right (may be precomputed
* using the corresponding mixed_inner_join_size API).
* @param mr Device memory resource used to allocate the returned table and columns' device memory
*
* @return A pair containing the size that would result from performing the
* requested join and the number of matches for each row in one of the two
* tables. Which of the two tables is an implementation detail and should not
* be relied upon, simply passed to the corresponding `mixed_inner_join` API as
* is.
*/
std::pair<std::size_t, std::unique_ptr<rmm::device_uvector<size_type>>> mixed_inner_join_size(
table_view const& left_equality,
table_view const& right_equality,
table_view const& left_conditional,
table_view const& right_conditional,
ast::expression const& binary_predicate,
null_equality compare_nulls = null_equality::EQUAL,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Returns the exact number of matches (rows) when performing a
* mixed left join between the specified tables where the columns of the
* equality table are equal and the predicate evaluates to true on the
* conditional tables.
*
* If the provided predicate returns NULL for a pair of rows (left, right),
* that pair is not included in the output. It is the user's responsiblity to
* choose a suitable compare_nulls value AND use appropriate null-safe
* operators in the expression.
*
* @throw cudf::logic_error If the binary predicate outputs a non-boolean result.
* @throw cudf::logic_error If the number of rows in left_equality and left_conditional do not
* match.
* @throw cudf::logic_error If the number of rows in right_equality and right_conditional do not
* match.
*
* @param left_equality The left table used for the equality join.
* @param right_equality The right table used for the equality join.
* @param left_conditional The left table used for the conditional join.
* @param right_conditional The right table used for the conditional join.
* @param binary_predicate The condition on which to join.
* @param compare_nulls Whether or not null values join to each other or not.
* @param output_size An optional pair of values indicating the exact output size and the number of
* matches for each row in the larger of the two input tables, left or right (may be precomputed
* using the corresponding mixed_inner_join_size API).
* @param mr Device memory resource used to allocate the returned table and columns' device memory
*
* @return A pair containing the size that would result from performing the
* requested join and the number of matches for each row in one of the two
* tables. Which of the two tables is an implementation detail and should not
* be relied upon, simply passed to the corresponding `mixed_left_join` API as
* is.
*/
std::pair<std::size_t, std::unique_ptr<rmm::device_uvector<size_type>>> mixed_left_join_size(
table_view const& left_equality,
table_view const& right_equality,
table_view const& left_conditional,
table_view const& right_conditional,
ast::expression const& binary_predicate,
null_equality compare_nulls = null_equality::EQUAL,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Returns the exact number of matches (rows) when performing a
* conditional inner join between the specified tables where the predicate
Expand Down
7 changes: 5 additions & 2 deletions cpp/include/cudf/table/row_operators.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,11 @@ template <template <typename> class hash_function, typename Nullate>
class row_hasher {
public:
row_hasher() = delete;
row_hasher(Nullate has_nulls, table_device_view t) : _table{t}, _has_nulls{has_nulls} {}
row_hasher(Nullate has_nulls, table_device_view t, uint32_t seed)
CUDF_HOST_DEVICE row_hasher(Nullate has_nulls, table_device_view t)
: _table{t}, _has_nulls{has_nulls}
{
}
CUDF_HOST_DEVICE row_hasher(Nullate has_nulls, table_device_view t, uint32_t seed)
: _table{t}, _seed(seed), _has_nulls{has_nulls}
{
}
Expand Down
21 changes: 14 additions & 7 deletions cpp/src/join/conditional_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,11 @@ conditional_join(table_view const& left,
// For inner joins we support optimizing the join by launching one thread for
// whichever table is larger rather than always using the left table.
auto swap_tables = (join_type == join_kind::INNER_JOIN) && (right_num_rows > left_num_rows);
detail::grid_1d config(swap_tables ? right_num_rows : left_num_rows, DEFAULT_JOIN_BLOCK_SIZE);
detail::grid_1d const config(swap_tables ? right_num_rows : left_num_rows,
DEFAULT_JOIN_BLOCK_SIZE);
auto const shmem_size_per_block = parser.shmem_per_thread * config.num_threads_per_block;
join_kind kernel_join_type = join_type == join_kind::FULL_JOIN ? join_kind::LEFT_JOIN : join_type;
join_kind const kernel_join_type =
join_type == join_kind::FULL_JOIN ? join_kind::LEFT_JOIN : join_type;

// If the join size was not provided as an input, compute it here.
std::size_t join_size;
Expand Down Expand Up @@ -197,6 +199,13 @@ std::size_t compute_conditional_join_output_size(table_view const& left,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
// Until we add logic to handle the number of non-matches in the right table,
// full joins are not supported in this function. Note that this does not
// prevent actually performing full joins since we do that by calculating the
// left join and then concatenating the complementary right indices.
CUDF_EXPECTS(join_type != join_kind::FULL_JOIN,
"Size estimation is not available for full joins.");

// We can immediately filter out cases where one table is empty. In
// some cases, we return all the rows of the other table with a corresponding
// null index for the empty table; in others, we return an empty output.
Expand Down Expand Up @@ -232,8 +241,7 @@ std::size_t compute_conditional_join_output_size(table_view const& left,
// If none of the input columns actually contain nulls, we can still use the
// non-nullable version of the expression evaluation code path for
// performance, so we capture that information as well.
auto const nullable = cudf::nullable(left) || cudf::nullable(right);
auto const has_nulls = nullable && (cudf::has_nulls(left) || cudf::has_nulls(right));
auto const has_nulls = binary_predicate.may_evaluate_null(left, right, stream);

auto const parser =
ast::detail::expression_parser{binary_predicate, left, right, has_nulls, stream, mr};
Expand All @@ -246,11 +254,10 @@ std::size_t compute_conditional_join_output_size(table_view const& left,
// For inner joins we support optimizing the join by launching one thread for
// whichever table is larger rather than always using the left table.
auto swap_tables = (join_type == join_kind::INNER_JOIN) && (right_num_rows > left_num_rows);
detail::grid_1d config(swap_tables ? right_num_rows : left_num_rows, DEFAULT_JOIN_BLOCK_SIZE);
detail::grid_1d const config(swap_tables ? right_num_rows : left_num_rows,
DEFAULT_JOIN_BLOCK_SIZE);
auto const shmem_size_per_block = parser.shmem_per_thread * config.num_threads_per_block;

assert(join_type != join_kind::FULL_JOIN);

// Allocate storage for the counter used to get the size of the join output
rmm::device_scalar<std::size_t> size(0, stream, mr);
CHECK_CUDA(stream.value());
Expand Down
Loading

0 comments on commit e4a16ae

Please sign in to comment.