diff --git a/src/main/cpp/CMakeLists.txt b/src/main/cpp/CMakeLists.txt index 44863aa22..70c9cd2a5 100644 --- a/src/main/cpp/CMakeLists.txt +++ b/src/main/cpp/CMakeLists.txt @@ -193,10 +193,10 @@ add_library( src/DateTimeRebaseJni.cpp src/DecimalUtilsJni.cpp src/GpuTimeZoneDBJni.cpp - src/HLLPPHostUDFJni.cpp src/HashJni.cpp src/HistogramJni.cpp src/HostTableJni.cpp + src/HyperLogLogPlusPlusHostUDFJni.cpp src/JSONUtilsJni.cpp src/NativeParquetJni.cpp src/ParseURIJni.cpp @@ -218,8 +218,9 @@ add_library( src/from_json_to_structs.cu src/get_json_object.cu src/histogram.cu - src/hllpp_host_udf.cu - src/hllpp.cu + src/hive_hash.cu + src/hyper_log_log_plus_plus.cu + src/hyper_log_log_plus_plus_host_udf.cu src/json_utils.cu src/murmur_hash.cu src/parse_uri.cu @@ -229,7 +230,6 @@ add_library( src/timezones.cu src/utilities.cu src/xxhash64.cu - src/hive_hash.cu src/zorder.cu ) diff --git a/src/main/cpp/src/HLLPPHostUDFJni.cpp b/src/main/cpp/src/HyperLogLogPlusPlusHostUDFJni.cpp similarity index 79% rename from src/main/cpp/src/HLLPPHostUDFJni.cpp rename to src/main/cpp/src/HyperLogLogPlusPlusHostUDFJni.cpp index a80a78c6b..adf5da52f 100644 --- a/src/main/cpp/src/HLLPPHostUDFJni.cpp +++ b/src/main/cpp/src/HyperLogLogPlusPlusHostUDFJni.cpp @@ -15,19 +15,22 @@ */ #include "cudf_jni_apis.hpp" -#include "hllpp.hpp" -#include "hllpp_host_udf.hpp" +#include "hyper_log_log_plus_plus.hpp" +#include "hyper_log_log_plus_plus_host_udf.hpp" extern "C" { -JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_HLLPPHostUDF_createHLLPPHostUDF( - JNIEnv* env, jclass, jint agg_type, int precision) +JNIEXPORT jlong JNICALL +Java_com_nvidia_spark_rapids_jni_HyperLogLogPlusPlusHostUDF_createHLLPPHostUDF(JNIEnv* env, + jclass, + jint agg_type, + int precision) { try { cudf::jni::auto_set_device(env); auto udf_ptr = [&] { // The value of agg_type must be sync with - // `HLLPPHostUDF.java#AggregationType`. + // `HyperLogLogPlusPlusHostUDF.java#AggregationType`. switch (agg_type) { case 0: return spark_rapids_jni::create_hllpp_reduction_host_udf(precision); case 1: return spark_rapids_jni::create_hllpp_reduction_merge_host_udf(precision); @@ -43,10 +46,8 @@ JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_HLLPPHostUDF_createHLLP } JNIEXPORT jlong JNICALL -Java_com_nvidia_spark_rapids_jni_HLLPPHostUDF_estimateDistinctValueFromSketches(JNIEnv* env, - jclass, - jlong sketches, - jint precision) +Java_com_nvidia_spark_rapids_jni_HyperLogLogPlusPlusHostUDF_estimateDistinctValueFromSketches( + JNIEnv* env, jclass, jlong sketches, jint precision) { JNI_NULL_CHECK(env, sketches, "Sketch column is null", 0); try { diff --git a/src/main/cpp/src/hllpp.cu b/src/main/cpp/src/hyper_log_log_plus_plus.cu similarity index 96% rename from src/main/cpp/src/hllpp.cu rename to src/main/cpp/src/hyper_log_log_plus_plus.cu index 8d39c6686..4ff785055 100644 --- a/src/main/cpp/src/hllpp.cu +++ b/src/main/cpp/src/hyper_log_log_plus_plus.cu @@ -55,8 +55,9 @@ namespace { */ __device__ inline int get_register_value(int64_t const ten_registers, int reg_idx) { - int64_t shift_mask = MASK << (REGISTER_VALUE_BITS * reg_idx); - int64_t v = (ten_registers & shift_mask) >> (REGISTER_VALUE_BITS * reg_idx); + auto const shift_bits = REGISTER_VALUE_BITS * reg_idx; + auto const shift_mask = MASK << shift_bits; + auto const v = (ten_registers & shift_mask) >> shift_bit; return static_cast(v); } @@ -418,7 +419,7 @@ std::unique_ptr group_hllpp(cudf::column_view const& input, auto num_long_cols = num_registers_per_sketch / REGISTERS_PER_LONG + 1; auto const results_iter = cudf::detail::make_counting_transform_iterator(0, [&](int i) { return cudf::make_numeric_column( - cudf::data_type{cudf::type_id::INT64}, num_groups, cudf::mask_state::ALL_VALID, stream, mr); + cudf::data_type{cudf::type_id::INT64}, num_groups, cudf::mask_state::UNALLOCATED, stream, mr); }); auto children = std::vector>(results_iter, results_iter + num_long_cols); @@ -609,7 +610,7 @@ std::unique_ptr group_merge_hllpp( // create output columns auto const results_iter = cudf::detail::make_counting_transform_iterator(0, [&](int i) { return cudf::make_numeric_column( - cudf::data_type{cudf::type_id::INT64}, num_groups, cudf::mask_state::ALL_VALID, stream, mr); + cudf::data_type{cudf::type_id::INT64}, num_groups, cudf::mask_state::UNALLOCATED, stream, mr); }); auto results = std::vector>(results_iter, results_iter + num_long_cols); @@ -705,7 +706,7 @@ std::unique_ptr reduce_hllpp(cudf::column_view const& input, auto const results_iter = cudf::detail::make_counting_transform_iterator(0, [&](int i) { return cudf::make_numeric_column(cudf::data_type{cudf::type_id::INT64}, 1 /**num_groups*/, - cudf::mask_state::ALL_VALID, + cudf::mask_state::UNALLOCATED, stream, mr); }); @@ -773,7 +774,7 @@ std::unique_ptr reduce_merge_hllpp(cudf::column_view const& input, auto const results_iter = cudf::detail::make_counting_transform_iterator(0, [&](int i) { return cudf::make_numeric_column(cudf::data_type{cudf::type_id::INT64}, 1 /** num_rows */, - cudf::mask_state::ALL_VALID, + cudf::mask_state::UNALLOCATED, stream, mr); }); @@ -814,13 +815,13 @@ std::unique_ptr reduce_merge_hllpp(cudf::column_view const& input, } struct estimate_fn { - cudf::device_span sketch_longs; - int const precision; - int64_t* const out; + cudf::device_span sketches; + int64_t* out; + int precision; __device__ void operator()(cudf::size_type const idx) const { - auto const num_regs = 1ull << precision; + auto const num_regs = 1 << precision; double sum = 0; int zeroes = 0; @@ -828,7 +829,7 @@ struct estimate_fn { // each long contains 10 register values int long_col_idx = reg_idx / REGISTERS_PER_LONG; int reg_idx_in_long = reg_idx % REGISTERS_PER_LONG; - int reg = get_register_value(sketch_longs[long_col_idx][idx], reg_idx_in_long); + int reg = get_register_value(sketches[long_col_idx][idx], reg_idx_in_long); sum += double{1} / static_cast(1ull << reg); zeroes += reg == 0; } @@ -848,7 +849,7 @@ std::unique_ptr group_hyper_log_log_plus_plus( rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - CUDF_EXPECTS(precision >= 4, "HyperLogLogPlusPlus requires precision >= 4."); + CUDF_EXPECTS(precision >= 4, "HyperLogLogPlusPlus requires precision bigger than 4."); auto adjust_precision = precision > MAX_PRECISION ? MAX_PRECISION : precision; return group_hllpp(input, num_groups, group_lables, adjust_precision, stream, mr); } @@ -861,7 +862,7 @@ std::unique_ptr group_merge_hyper_log_log_plus_plus( rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - CUDF_EXPECTS(precision >= 4, "HyperLogLogPlusPlus requires precision >= 4."); + CUDF_EXPECTS(precision >= 4, "HyperLogLogPlusPlus requires precision bigger than 4."); CUDF_EXPECTS(input.type().id() == cudf::type_id::STRUCT, "HyperLogLogPlusPlus buffer type must be a STRUCT of long columns."); for (auto i = 0; i < input.num_children(); i++) { @@ -880,7 +881,7 @@ std::unique_ptr reduce_hyper_log_log_plus_plus(cudf::column_view c rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - CUDF_EXPECTS(precision >= 4, "HyperLogLogPlusPlus requires precision >= 4."); + CUDF_EXPECTS(precision >= 4, "HyperLogLogPlusPlus requires precision bigger than 4."); auto adjust_precision = precision > MAX_PRECISION ? MAX_PRECISION : precision; return reduce_hllpp(input, adjust_precision, stream, mr); } @@ -891,7 +892,7 @@ std::unique_ptr reduce_merge_hyper_log_log_plus_plus( rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - CUDF_EXPECTS(precision >= 4, "HyperLogLogPlusPlus requires precision >= 4."); + CUDF_EXPECTS(precision >= 4, "HyperLogLogPlusPlus requires precision bigger than 4."); CUDF_EXPECTS(input.type().id() == cudf::type_id::STRUCT, "HyperLogLogPlusPlus buffer type must be a STRUCT of long columns."); for (auto i = 0; i < input.num_children(); i++) { @@ -910,13 +911,21 @@ std::unique_ptr estimate_from_hll_sketches(cudf::column_view const rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - CUDF_EXPECTS(precision >= 4, "HyperLogLogPlusPlus requires precision is bigger than 4."); + CUDF_EXPECTS(precision >= 4, "HyperLogLogPlusPlus requires precision bigger than 4."); + CUDF_EXPECTS(input.type().id() == cudf::type_id::STRUCT, + "HyperLogLogPlusPlus buffer type must be a STRUCT of long columns."); + for (auto i = 0; i < input.num_children(); i++) { + CUDF_EXPECTS(input.child(i).type().id() == cudf::type_id::INT64, + "HyperLogLogPlusPlus buffer type must be a STRUCT of long columns."); + } auto const input_iter = cudf::detail::make_counting_transform_iterator( 0, [&](int i) { return input.child(i).begin(); }); - auto input_cols = std::vector(input_iter, input_iter + input.num_children()); - auto d_inputs = cudf::detail::make_device_uvector_async(input_cols, stream, mr); - auto result = cudf::make_numeric_column( - cudf::data_type{cudf::type_id::INT64}, input.size(), cudf::mask_state::ALL_VALID, stream); + auto const h_input_ptrs = + std::vector(input_iter, input_iter + input.num_children()); + auto d_inputs = cudf::detail::make_device_uvector_async( + h_input_ptrs, stream, cudf::get_current_device_resource_ref()); + auto result = cudf::make_numeric_column( + cudf::data_type{cudf::type_id::INT64}, input.size(), cudf::mask_state::UNALLOCATED, stream, mr); // evaluate from struct thrust::for_each_n(rmm::exec_policy_nosync(stream), thrust::make_counting_iterator(0), diff --git a/src/main/cpp/src/hllpp.hpp b/src/main/cpp/src/hyper_log_log_plus_plus.hpp similarity index 80% rename from src/main/cpp/src/hllpp.hpp rename to src/main/cpp/src/hyper_log_log_plus_plus.hpp index d93e1debd..33df3b37a 100644 --- a/src/main/cpp/src/hllpp.hpp +++ b/src/main/cpp/src/hyper_log_log_plus_plus.hpp @@ -18,9 +18,9 @@ #include #include #include +#include #include -#include namespace spark_rapids_jni { @@ -56,8 +56,8 @@ std::unique_ptr group_hyper_log_log_plus_plus( int64_t const num_groups, cudf::device_span group_lables, int64_t const precision, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr); + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** * Merge HyperLogLogPlusPlus(HLLPP) sketches in the same group. @@ -69,18 +69,19 @@ std::unique_ptr group_merge_hyper_log_log_plus_plus( int64_t const num_groups, cudf::device_span group_lables, int64_t const precision, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr); + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** * Compute hash codes for the input, generate HyperLogLogPlusPlus(HLLPP) * sketches from hash codes, and merge all the sketches into one sketch, output * is a struct scalar with multiple long values. */ -std::unique_ptr reduce_hyper_log_log_plus_plus(cudf::column_view const& input, - int64_t const precision, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr); +std::unique_ptr reduce_hyper_log_log_plus_plus( + cudf::column_view const& input, + int64_t const precision, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** * Merge all HyperLogLogPlusPlus(HLLPP) sketches in the input column into one @@ -90,8 +91,8 @@ std::unique_ptr reduce_hyper_log_log_plus_plus(cudf::column_view c std::unique_ptr reduce_merge_hyper_log_log_plus_plus( cudf::column_view const& input, int64_t const precision, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr); + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** * Estimate count distinct values for the input which contains @@ -103,6 +104,5 @@ std::unique_ptr estimate_from_hll_sketches( cudf::column_view const& input, int precision, rmm::cuda_stream_view stream = cudf::get_default_stream(), - rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); - + rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); } // namespace spark_rapids_jni diff --git a/src/main/cpp/src/hllpp_host_udf.cu b/src/main/cpp/src/hyper_log_log_plus_plus_host_udf.cu similarity index 97% rename from src/main/cpp/src/hllpp_host_udf.cu rename to src/main/cpp/src/hyper_log_log_plus_plus_host_udf.cu index 370b906b6..a112117c3 100644 --- a/src/main/cpp/src/hllpp_host_udf.cu +++ b/src/main/cpp/src/hyper_log_log_plus_plus_host_udf.cu @@ -13,9 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -#include "hllpp.hpp" -#include "hllpp_host_udf.hpp" +#include "hyper_log_log_plus_plus.hpp" +#include "hyper_log_log_plus_plus_host_udf.hpp" #include #include @@ -126,7 +125,8 @@ struct hllpp_udf : cudf::host_udf_base { std::move(children), 0, // null count rmm::device_buffer{}, // null mask - stream); + stream, + mr); } } diff --git a/src/main/cpp/src/hllpp_host_udf.hpp b/src/main/cpp/src/hyper_log_log_plus_plus_host_udf.hpp similarity index 100% rename from src/main/cpp/src/hllpp_host_udf.hpp rename to src/main/cpp/src/hyper_log_log_plus_plus_host_udf.hpp diff --git a/src/main/java/com/nvidia/spark/rapids/jni/HLLPPHostUDF.java b/src/main/java/com/nvidia/spark/rapids/jni/HyperLogLogPlusPlusHostUDF.java similarity index 98% rename from src/main/java/com/nvidia/spark/rapids/jni/HLLPPHostUDF.java rename to src/main/java/com/nvidia/spark/rapids/jni/HyperLogLogPlusPlusHostUDF.java index 9018474c2..6d09be3de 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/HLLPPHostUDF.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/HyperLogLogPlusPlusHostUDF.java @@ -23,7 +23,7 @@ /** * HyperLogLogPlusPlus(HLLPP) host UDF aggregation utils */ -public class HLLPPHostUDF { +public class HyperLogLogPlusPlusHostUDF { static { NativeDepsLoader.loadNativeDeps(); }