diff --git a/CMakeLists.txt b/CMakeLists.txt index 48a365a90d5..68cde00db78 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -120,6 +120,7 @@ cuda_add_library(gdf SHARED src/scan.cu src/segmented_sorting.cu src/datetimeops.cu + src/sqls_ops.cu ) diff --git a/include/gdf/cffi/functions.h b/include/gdf/cffi/functions.h index f0a14c65542..a1c880085fa 100644 --- a/include/gdf/cffi/functions.h +++ b/include/gdf/cffi/functions.h @@ -404,3 +404,72 @@ gdf_error gdf_max_f32(gdf_column *col, float *dev_result, gdf_size_type dev_resu gdf_error gdf_max_i64(gdf_column *col, int64_t *dev_result, gdf_size_type dev_result_size); gdf_error gdf_max_i32(gdf_column *col, int32_t *dev_result, gdf_size_type dev_result_size); gdf_error gdf_max_i8(gdf_column *col, int8_t *dev_result, gdf_size_type dev_result_size); + +/* + Multi-Column SQL ops: + WHERE (Filtering) + ORDER-BY + GROUP-BY + */ +gdf_error gdf_order_by(size_t nrows, //in: # rows + gdf_column* cols, //in: host-side array of gdf_columns + size_t ncols, //in: # cols + void** d_cols, //out: pre-allocated device-side array to be filled with gdf_column::data for each column; slicing of gdf_column array (host) + int* d_types, //out: pre-allocated device-side array to be filled with gdf_colum::dtype for each column; slicing of gdf_column array (host) + size_t* d_indx); //out: device-side array of re-rdered row indices + +gdf_error gdf_filter(size_t nrows, //in: # rows + gdf_column* cols, //in: host-side array of gdf_columns + size_t ncols, //in: # cols + void** d_cols, //out: pre-allocated device-side array to be filled with gdf_column::data for each column; slicing of gdf_column array (host) + int* d_types, //out: pre-allocated device-side array to be filled with gdf_colum::dtype for each column; slicing of gdf_column array (host) + void** d_vals, //in: device-side array of values to filter against (type-erased) + size_t* d_indx, //out: device-side array of row indices that remain after filtering + size_t* new_sz); //out: host-side # rows that remain after filtering + +gdf_error gdf_group_by_sum(int ncols, // # columns + gdf_column** cols, //input cols + gdf_column* col_agg, //column to aggregate on + gdf_column* out_col_indices, //if not null return indices of re-ordered rows + gdf_column** out_col_values, //if not null return the grouped-by columns + //(multi-gather based on indices, which are needed anyway) + gdf_column* out_col_agg, //aggregation result + gdf_context* ctxt); //struct with additional info: bool is_sorted, flag_sort_or_hash, bool flag_count_distinct + +gdf_error gdf_group_by_min(int ncols, // # columns + gdf_column** cols, //input cols + gdf_column* col_agg, //column to aggregate on + gdf_column* out_col_indices, //if not null return indices of re-ordered rows + gdf_column** out_col_values, //if not null return the grouped-by columns + //(multi-gather based on indices, which are needed anyway) + gdf_column* out_col_agg, //aggregation result + gdf_context* ctxt); //struct with additional info: bool is_sorted, flag_sort_or_hash, bool flag_count_distinct + + +gdf_error gdf_group_by_max(int ncols, // # columns + gdf_column** cols, //input cols + gdf_column* col_agg, //column to aggregate on + gdf_column* out_col_indices, //if not null return indices of re-ordered rows + gdf_column** out_col_values, //if not null return the grouped-by columns + //(multi-gather based on indices, which are needed anyway) + gdf_column* out_col_agg, //aggregation result + gdf_context* ctxt); //struct with additional info: bool is_sorted, flag_sort_or_hash, bool flag_count_distinct + + +gdf_error gdf_group_by_avg(int ncols, // # columns + gdf_column** cols, //input cols + gdf_column* col_agg, //column to aggregate on + gdf_column* out_col_indices, //if not null return indices of re-ordered rows + gdf_column** out_col_values, //if not null return the grouped-by columns + //(multi-gather based on indices, which are needed anyway) + gdf_column* out_col_agg, //aggregation result + gdf_context* ctxt); //struct with additional info: bool is_sorted, flag_sort_or_hash, bool flag_count_distinct + +gdf_error gdf_group_by_count(int ncols, // # columns + gdf_column** cols, //input cols + gdf_column* col_agg, //column to aggregate on + gdf_column* out_col_indices, //if not null return indices of re-ordered rows + gdf_column** out_col_values, //if not null return the grouped-by columns + //(multi-gather based on indices, which are needed anyway) + gdf_column* out_col_agg, //aggregation result + gdf_context* ctxt); //struct with additional info: bool is_sorted, flag_sort_or_hash, bool flag_count_distinct diff --git a/include/gdf/cffi/types.h b/include/gdf/cffi/types.h index 19fe17ec074..a13e6a7881e 100644 --- a/include/gdf/cffi/types.h +++ b/include/gdf/cffi/types.h @@ -10,9 +10,10 @@ typedef enum { GDF_INT64, GDF_FLOAT32, GDF_FLOAT64, - GDF_DATE32, // int32_t days since the UNIX epoch - GDF_DATE64, // int64_t milliseconds since the UNIX epoch - GDF_TIMESTAMP // Exact timestamp encoded with int64 since UNIX epoch (Default unit millisecond) + GDF_DATE32, // int32_t days since the UNIX epoch + GDF_DATE64, // int64_t milliseconds since the UNIX epoch + GDF_TIMESTAMP,// Exact timestamp encoded with int64 since UNIX epoch (Default unit millisecond) + N_GDF_TYPES, /* additional types should go BEFORE N_GDF_TYPES */ } gdf_dtype; typedef enum { @@ -21,10 +22,12 @@ typedef enum { GDF_UNSUPPORTED_DTYPE, GDF_COLUMN_SIZE_MISMATCH, GDF_COLUMN_SIZE_TOO_BIG, + GDF_DATASET_EMPTY, GDF_VALIDITY_MISSING, GDF_VALIDITY_UNSUPPORTED, GDF_JOIN_DTYPE_MISMATCH, GDF_JOIN_TOO_MANY_COLUMNS, + GDF_UNSUPPORTED_METHOD, } gdf_error; typedef enum { @@ -48,8 +51,28 @@ typedef struct gdf_column_{ gdf_dtype_extra_info dtype_info; } gdf_column; +typedef enum { + GDF_SORT = 0, + GDF_HASH, + N_GDF_METHODS, /* additional methods should go BEFORE N_GDF_METHODS */ +} gdf_method; +typedef enum { + GDF_SUM = 0, + GDF_MIN, + GDF_MAX, + GDF_AVG, + GDF_COUNT, + GDF_COUNT_DISTINCT, + N_GDF_AGG_OPS, /* additional aggregation ops should go BEFORE N_GDF_... */ +} gdf_agg_op; +/* additonal flags */ +typedef struct gdf_context_{ + int flag_sorted; /* 0 = No, 1 = yes */ + gdf_method flag_method; /* what method is used */ + int flag_distinct; /* for COUNT: DISTINCT = 1, else = 0 */ +} gdf_context; struct _OpaqueIpcParser; typedef struct _OpaqueIpcParser gdf_ipc_parser_type; diff --git a/include/sql_set_ops.hpp b/include/sql_set_ops.hpp new file mode 100644 index 00000000000..e22592a6ab3 --- /dev/null +++ b/include/sql_set_ops.hpp @@ -0,0 +1,1000 @@ +/* Copyright 2018 NVIDIA Corporation. All rights reserved. */ + +//C++ style of interface for Multi-column Filter, Order-By, and Group-By functionality + +# pragma once + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +//Potential PROBLEM: thrust::tuple limited to 10 type args +// +template +using Tuple = thrust::tuple; + +template +using Vector = thrust::device_vector; + + +template class Vector> +__host__ __device__ +void print_v(const Vector& v, std::ostream& os) +{ + thrust::copy(v.begin(), v.end(), std::ostream_iterator(os,",")); + os<<"\n"; +} + +template class Vector> +__host__ __device__ +void print_v(const Vector& v, typename Vector::const_iterator pos, std::ostream& os) +{ + thrust::copy(v.begin(), pos, std::ostream_iterator(os,","));//okay + os<<"\n"; +} + +template class Vector> +__host__ __device__ +void print_v(const Vector& v, size_t n, std::ostream& os) +{ + thrust::copy_n(v.begin(), n, std::ostream_iterator(os,","));//okay + os<<"\n"; +} + + +//Multi-column Ordering policies: via Sort, or Hash (for now) +// +enum class ColumnOrderingPolicy { Sort = 0, Hash }; + + +//RTI for "opaque" tuple comparison fctr; +//("opaque" as opposed to "transparent" +// variadic pack expansion, which doesn't +// compile with device functions) +// +//generic-case template: +//Note: need two indices as template args +//in order to avoid reverting the tuple +//for proper comparison order +// +template +struct LesserIndex +{ + //for sort: + // + __host__ __device__ + static bool less(const Tuple& left, const Tuple& right, int i1, int i2) + { + if( thrust::get(left)[i1] < thrust::get(right)[i2] ) + return true; + else if( thrust::get(left)[i1] == thrust::get(right)[i2] ) + { + return LesserIndex::less(left, right, i1, i2); + } + else + return false; + } + + //for unique_by_key, reduce_by_key: + // + __host__ __device__ + static bool equal(const Tuple& left, const Tuple& right, int i1, int i2) + { + if( thrust::get(left)[i1] == thrust::get(right)[i2] ) + return LesserIndex::equal(left, right, i1, i2); + else + return false; + } + + //for filter: + // + template + __host__ __device__ + static bool equal(const Tuple& left, const TplVals& right, int i1) + { + if( thrust::get(left)[i1] == thrust::get(right) ) + return LesserIndex::equal(left, right, i1); + else + return false; + } +}; + +//Partial specialization for bottom of RTI recursion: +// +template +struct LesserIndex +{ + //for sort: + // + __host__ __device__ + static bool less(const Tuple& left, const Tuple& right, int i1, int i2) + { + if( thrust::get(left)[i1] < thrust::get(right)[i2] ) + return true; + else + return false; + } + + //for unique_by_key, reduce_by_key: + // + __host__ __device__ + static bool equal(const Tuple& left, const Tuple& right, int i1, int i2) + { + return thrust::get(left)[i1] == thrust::get(right)[i2]; + } + + //for filter: + // + template + __host__ __device__ + static bool equal(const Tuple& left, const TplVals& right, int i1) + { + return thrust::get(left)[i1] == thrust::get(right); + } +}; + +//RTI for "opaque" tuple of pairs comparison fctr; +//("opaque" as opposed to "transparent" +// variadic pack expansion, which doesn't +// compile with device functions) +// +//generic-case template: +//Note: need two indices as template args +//in order to avoid reverting the tuple +//for proper comparison order +// +template +struct PairsComparer +{ + //version with tuple of pair of pointers: + //one pointer to an array, + //the other to just one value (of same type) + //(for filter) + // + __host__ __device__ + static bool equal(const TuplePairPtr& tplpairptrs, size_t i1) + { + if( thrust::get(tplpairptrs)[i1] == + *(thrust::get(tplpairptrs)) ) + return PairsComparer::equal(tplpairptrs, i1); + else + return false; + } +}; + +//Partial specialization for bottom of RTI recursion: +// +template +struct PairsComparer +{ + //version with tuple of pair of pointers: + //one pointer to an array, + //the other to just one value (of same type) + //(for filter) + // + __host__ __device__ + static bool equal(const TuplePairPtr& tplpairptrs, size_t i1) + { + return (thrust::get(tplpairptrs)[i1] == *(thrust::get(tplpairptrs))); + } +}; + +//########################################################################### +//# Multi-column ORDER-BY: # +//########################################################################### +//args: +//Input: +// sz = # rows; +// tv1 = table as a tuple of columns (pointers to device arrays) +//stream = cudaStream to work in; +// +//Output: +// v = vector of indices re-ordered after sorting; +// +template +__host__ __device__ +void multi_col_order_by(size_t sz, + const TplPtrs& tv1, + IndexT* ptr_d_v, + cudaStream_t stream = NULL) +{ + + thrust::sequence(thrust::cuda::par.on(stream), ptr_d_v, ptr_d_v+sz, 0);//cannot use counting_iterator + // 2 reasons: + //(1.) need to return a container result; + //(2.) that container must be mutable; + + thrust::sort(thrust::cuda::par.on(stream), + ptr_d_v, ptr_d_v+sz, + [tv1] __host__ __device__ (int i1, int i2){ + //C+11 variadic pack expansion doesn't work with + //__host__ __device__ code: + // + //Tuple t1(thrust::get(tv1)[i1]...); + //Tuple t2(thrust::get(tv1)[i2]...); + //return (t1 < t2); + + //use RTI (C++03 style) instead: + // + return LesserIndex::value-1, thrust::tuple_size::value-1>::less(tv1, tv1, i1, i2); + }); +} + +template +void multi_col_order_by(size_t sz, + const TplPtrs& tv1, + VectorIndexT& v, + cudaStream_t stream = NULL) +{ + ///VectorT v(sz, 0); + assert( v.size() >= sz ); + + multi_col_order_by(sz, tv1, v.data().get(), stream); +} + + +//########################################################################### +//# Multi-column Filter: # +//########################################################################### +//Filter tuple of device vectors ("table") +//on given tuple of values; +//Input: +//nrows = # rows +//tptrs = table as a tuple of columns (pointers to device arrays); +//tvals = tuple of values to filter against; +//stream = cudaStream to work in; +//Output: +//d_flt_indx = device_vector of indx for which a match exists (no holes in array!); +//new_sz = new #rows after filtering; +//Return: +//ret = true if filtered result is non-empty, false otherwise; +// +template +__host__ __device__ +size_t multi_col_filter(size_t nrows, + const TplPtrs& tptrs, + const TplVals& tvals, + IndexT* ptr_d_flt_indx, + cudaStream_t stream = NULL) +{ + //actual filtering happens here: + // + auto ret_iter_last = + thrust::copy_if(thrust::cuda::par.on(stream), + thrust::make_counting_iterator(0), thrust::make_counting_iterator(nrows), + ptr_d_flt_indx, + [tptrs, tvals] __host__ __device__ (size_t indx){ + return LesserIndex::value-1, thrust::tuple_size::value-1>::equal(tptrs, tvals, indx); + }); + + size_t new_sz = thrust::distance(ptr_d_flt_indx,ret_iter_last); + + return new_sz; +} + +//version with tuple of pairs of pointers +//(adjacent pointers of same type; Example: +// thrust::tuple) +// +//CAVEAT: all adjacent pairs of +// pointers must reside on __device__! +// +//(All must be passed by pointers because of type erasure in pygdf +// and, consequently, the NestedIfThenElser expects uniform type erasure) +// +template +__host__ __device__ +size_t multi_col_filter(size_t nrows, + const TplPairsPtrs& tptrs, + IndexT* ptr_d_flt_indx, + cudaStream_t stream = NULL) +{ + //actual filtering happens here: + // + auto ret_iter_last = + thrust::copy_if(thrust::cuda::par.on(stream), + thrust::make_counting_iterator(0), thrust::make_counting_iterator(nrows), + ptr_d_flt_indx, + [tptrs] __host__ __device__ (IndexT indx){ + return PairsComparer::value-1, thrust::tuple_size::value-1>::equal(tptrs, indx); + }); + + size_t new_sz = thrust::distance(ptr_d_flt_indx,ret_iter_last); + + return new_sz; +} + +template +size_t multi_col_filter(size_t nrows, + const TplPtrs& tptrs, + const TplVals& tvals, + VectorIndexT& d_flt_indx, + cudaStream_t stream = NULL) +{ + assert( d_flt_indx.size() >= nrows ); + + size_t new_sz = multi_col_filter(nrows, tptrs, tvals ,d_flt_indx.data().get(), stream); + + assert( new_sz <= nrows ); + return new_sz; +} + + +//########################################################################### +//# Multi-column Group-By: # +//########################################################################### +//group-by is a reduce_by_key +// +//not only COUNT(*) makes sense with multicolumn: +//one can specify a multi-column GROUP-BY criterion +//and one specifc column to aggregate on; +// +//Input: +//sz = # rows +//tptrs = table as a tuple of columns (pointers to device arrays); +//stream = cudaStream to work in; +//Output: +//d_indx = reordering of indices after sorting; +// (passed as argument to avoid allocations inside the stream) +//d_kout = indices of rows after group by; +//d_vout = aggregated values (COUNT-ed) as a result of group-by; +//d_items = device_vector of items corresponding to indices in d_flt_indx; +//Return: +//ret = pair of iterators into (d_kout, d_vout), respectively; +// +template +__host__ __device__ +thrust::pair +multi_col_group_by_count_via_sort(size_t sz, + const TplPtrs& tptrs, + IndexT* ptr_d_indx, + IndexT* ptr_d_kout, + IndexT* ptr_d_vout, + bool sorted = false, + cudaStream_t stream = NULL) +{ + if( !sorted ) + multi_col_order_by(sz, tptrs, ptr_d_indx, stream); + + thrust::pair ret = + thrust::reduce_by_key(thrust::cuda::par.on(stream), + ptr_d_indx, ptr_d_indx+sz, + thrust::make_constant_iterator(1), + ptr_d_kout, + ptr_d_vout, + [tptrs] __host__ __device__(int key1, int key2){ + return LesserIndex::value-1, thrust::tuple_size::value-1>::equal(tptrs, tptrs, key1, key2); + }); + + //COUNT(*) for each distinct entry gets collected in d_vout; + //DISTINCT COUNT(*) is just: thrust::distance(d_kout.begin(), ret.first) + + return ret; +} + +//generic (purposely do-nothing) implementation +// +template +struct MultiColGroupByCount +{ + //this should NEVER get called + //(hence the assert(false);) + // + __host__ __device__ + thrust::pair + operator() (size_t sz, + const TplPtrs& tptrs, + IndexT* ptr_d_indx, + IndexT* ptr_d_kout, + IndexT* ptr_d_vout, + bool sorted = false, + cudaStream_t stream = NULL) + { + assert( false ); + return thrust::make_pair(nullptr, nullptr); + } +}; + +//partial specialization for ColumnOrderingPolicy::Sort +// +template +struct MultiColGroupByCount +{ + __host__ __device__ + thrust::pair operator() (size_t sz, + const TplPtrs& tptrs, + IndexT* ptr_d_indx, + IndexT* ptr_d_kout, + IndexT* ptr_d_vout, + bool sorted = false, + cudaStream_t stream = NULL) + { + return multi_col_group_by_count_via_sort(sz, tptrs, ptr_d_indx, ptr_d_kout, ptr_d_vout, sorted, stream); + } +}; + +template +thrust::pair +multi_col_group_by_count(size_t sz, + const TplPtrs& tptrs, + VectorIndexT& d_indx, + VectorIndexT& d_kout, + VectorIndexT& d_vout, + bool sorted = false, + cudaStream_t stream = NULL) +{ + assert( d_indx.size() >= sz ); + assert( d_kout.size() >= sz ); + assert( d_vout.size() >= sz ); + + using IndexT = typename VectorIndexT::value_type; + + MultiColGroupByCount mcgbc; + + auto ret = mcgbc(sz, tptrs, d_indx.data().get(), d_kout.data().get(), d_vout.data().get(), sorted, stream); + + typename VectorIndexT::iterator fst = d_kout.begin(); + typename VectorIndexT::iterator snd = d_vout.begin(); + thrust::advance(fst, ret.first - d_kout.data().get()); + thrust::advance(snd, ret.second - d_vout.data().get()); + + return thrust::make_pair(fst, snd); +} + + + + + +//group-by is a reduce_by_key +// +//not only COUNT(*) makes sense with multicolumn: +//one can specify a multi-column GROUP-BY criterion +//and one specifc column to aggregate on; +// +//Input: +//sz = # rows +//tptrs = table as a tuple of columns (pointers to device arrays); +//d_agg = column (device vector) to get aggregated; +//fctr = functor to perform the aggregation +//stream = cudaStream to work in; +//Output: +//d_indx = reordering of indices after sorting; +// (passed as argument to avoid allcoations inside the stream) +//d_agg_p = reordering of d_agg after sorting; +// (passed as argument to avoid allcoations inside the stream) +//d_kout = indices of rows after group by; +//d_vout = aggregated values (counted) as a result of group-by; +//Return: +//ret = pair of iterators into (d_kout, d_vout), respectively; +// +template +__host__ __device__ +thrust::pair + multi_col_group_by_via_sort(size_t sz, + const TplPtrs& tptrs, + const ValsT* ptr_d_agg, + Reducer fctr, + IndexT* ptr_d_indx, + ValsT* ptr_d_agg_p, + IndexT* ptr_d_kout, + ValsT* ptr_d_vout, + bool sorted = false, + cudaStream_t stream = NULL) +{ + if( !sorted ) + multi_col_order_by(sz, tptrs, ptr_d_indx, stream); + + thrust::gather(thrust::cuda::par.on(stream), + ptr_d_indx, ptr_d_indx + sz, //map[i] + ptr_d_agg, //source[i] + ptr_d_agg_p); //source[map[i]] + + thrust::pair ret = + thrust::reduce_by_key(thrust::cuda::par.on(stream), + ptr_d_indx, ptr_d_indx + sz, + ptr_d_agg_p, + ptr_d_kout, + ptr_d_vout, + [tptrs] __host__ __device__(int key1, int key2){ + return LesserIndex::value-1, thrust::tuple_size::value-1>::equal(tptrs, tptrs, key1, key2); + }, + fctr); + + return ret; +} + +//SUM +// +template +__host__ __device__ +thrust::pair + multi_col_group_by_sum_sort(size_t sz, + const TplPtrs& tptrs, + const ValsT* ptr_d_agg, + IndexT* ptr_d_indx, + ValsT* ptr_d_agg_p, + IndexT* ptr_d_kout, + ValsT* ptr_d_vout, + bool sorted = false, + cudaStream_t stream = NULL) +{ + auto lamb = [] __host__ __device__ (ValsT x, ValsT y){ + return x+y; + }; + + using ReducerT = decltype(lamb); + + return multi_col_group_by_via_sort(sz, + tptrs, + ptr_d_agg, + lamb, + ptr_d_indx, + ptr_d_agg_p, + ptr_d_kout, + ptr_d_vout, + sorted, + stream); +} + +//MIN +// +template +__host__ __device__ +thrust::pair + multi_col_group_by_min_sort(size_t sz, + const TplPtrs& tptrs, + const ValsT* ptr_d_agg, + IndexT* ptr_d_indx, + ValsT* ptr_d_agg_p, + IndexT* ptr_d_kout, + ValsT* ptr_d_vout, + bool sorted = false, + cudaStream_t stream = NULL) +{ + auto lamb = [] __host__ __device__ (ValsT x, ValsT y){ + return (x(sz, + tptrs, + ptr_d_agg, + lamb, + ptr_d_indx, + ptr_d_agg_p, + ptr_d_kout, + ptr_d_vout, + sorted, + stream); +} + +//MAX +// +template +__host__ __device__ +thrust::pair + multi_col_group_by_max_sort(size_t sz, + const TplPtrs& tptrs, + const ValsT* ptr_d_agg, + IndexT* ptr_d_indx, + ValsT* ptr_d_agg_p, + IndexT* ptr_d_kout, + ValsT* ptr_d_vout, + bool sorted = false, + cudaStream_t stream = NULL) +{ + auto lamb = [] __host__ __device__ (ValsT x, ValsT y){ + return (x>y?x:y); + }; + + using ReducerT = decltype(lamb); + + return multi_col_group_by_via_sort(sz, + tptrs, + ptr_d_agg, + lamb, + ptr_d_indx, + ptr_d_agg_p, + ptr_d_kout, + ptr_d_vout, + sorted, + stream); +} + +//AVERAGE +// +template +__host__ __device__ +thrust::pair + multi_col_group_by_avg_sort(size_t sz, + const TplPtrs& tptrs, + const ValsT* ptr_d_agg, + IndexT* ptr_d_indx, + IndexT* ptr_d_cout, + ValsT* ptr_d_agg_p, + IndexT* ptr_d_kout, + ValsT* ptr_d_vout, + bool sorted = false, + cudaStream_t stream = NULL) +{ + auto pair_count = multi_col_group_by_count_via_sort(sz, + tptrs, + ptr_d_indx, + ptr_d_kout, + ptr_d_cout, + sorted, + stream); + + auto pair_sum = multi_col_group_by_sum_sort(sz, + tptrs, + ptr_d_agg, + ptr_d_indx, + ptr_d_agg_p, + ptr_d_kout, + ptr_d_vout, + sorted, + stream); + + thrust::transform(thrust::cuda::par.on(stream), + ptr_d_cout, ptr_d_cout + sz, + ptr_d_vout, + ptr_d_vout, + [] __host__ __device__ (IndexT n, ValsT sum){ + return sum/static_cast(n); + }); + + return pair_sum; +} + +//generic (purposely do-nothing) implementation +// +template +struct MultiColGroupBy +{ + //this should NEVER get called + //(hence the assert(false);) + // + __host__ __device__ + thrust::pair + operator() (size_t sz, + const TplPtrs& tptrs, + const ValsT* ptr_d_agg, + Reducer fctr, + IndexT* ptr_d_indx, + ValsT* ptr_d_agg_p, + IndexT* ptr_d_kout, + ValsT* ptr_d_vout, + cudaStream_t stream = NULL) + { + assert( false ); + return thrust::make_pair(nullptr, nullptr); + } +}; + +//partial specialization for ColumnOrderingPolicy::Sort +// +template +struct MultiColGroupBy +{ + __host__ __device__ + thrust::pair + operator() (size_t sz, + const TplPtrs& tptrs, + const ValsT* ptr_d_agg, + Reducer fctr, + IndexT* ptr_d_indx, + ValsT* ptr_d_agg_p, + IndexT* ptr_d_kout, + ValsT* ptr_d_vout, + bool sorted = false, + cudaStream_t stream = NULL) + { + return multi_col_group_by_via_sort(sz, + tptrs, + ptr_d_agg, + fctr, + ptr_d_indx, + ptr_d_agg_p, + ptr_d_kout, + ptr_d_vout, + sorted, + stream); + } +}; + +template +thrust::pair + multi_col_group_by(size_t sz, + const TplPtrs& tptrs, + const VectorValsT& d_agg, + Reducer fctr, + VectorIndexT& d_indx, + VectorValsT& d_agg_p, + VectorIndexT& d_kout, + VectorValsT& d_vout, + bool sorted = false, + cudaStream_t stream = NULL) +{ + + + assert( d_indx.size() >= sz ); + assert( d_kout.size() >= sz ); + assert( d_vout.size() >= sz ); + assert( d_agg_p.size() >= sz ); + + // multi_col_order_by(sz, tptrs, d_indx, stream); + + // thrust::gather(thrust::cuda::par.on(stream), + // d_indx.begin(), d_indx.end(), //map[i] + // d_agg.begin(), //source[i] + // d_agg_p.begin()); //source[map[i]] + + // thrust::pair ret = + // thrust::reduce_by_key(thrust::cuda::par.on(stream), + // d_indx.begin(), d_indx.end(), + // d_agg_p.begin(), + // d_kout.begin(), + // d_vout.begin(), + // [tptrs] __host__ __device__(int key1, int key2){ + // return LesserIndex::value-1, thrust::tuple_size::value-1>::equal(tptrs, tptrs, key1, key2); + // }, + // fctr); + // return ret; + + //DISTINCT AGG(*) does NOT make sense for any AGG other than COUNT + + using IndexT = typename VectorIndexT::value_type; + using ValsT = typename VectorValsT::value_type; + + MultiColGroupBy mcgb; + auto ret = mcgb(sz, tptrs, d_agg.data().get(), fctr, d_indx.data().get(), d_agg_p.data().get(), d_kout.data().get(), d_vout.data().get(), sorted, stream); + + typename VectorIndexT::iterator fst = d_kout.begin(); + typename VectorValsT::iterator snd = d_vout.begin(); + thrust::advance(fst, ret.first - d_kout.data().get()); + thrust::advance(snd, ret.second - d_vout.data().get()); + + return thrust::make_pair(fst, snd); +} + +//Multi-column group-by SUM: +// +//Input: +//sz = # rows +//tptrs = table as a tuple of columns (pointers to device arrays); +//d_agg = column (device vector) to get aggregated; +//stream = cudaStream to work in; +//Output: +//d_indx = reordering of indices after sorting; +// (passed as argument to avoid allcoations inside the stream) +//d_agg_p = reordering of d_agg after sorting; +// (passed as argument to avoid allcoations inside the stream) +//d_kout = indices of rows after group by; +//d_vout = aggregated values (SUM-ed) as a result of group-by; +//Return: +//ret = pair of iterators into (d_kout, d_vout), respectively; +// +template +thrust::pair + multi_col_group_by_sum(size_t sz, + const TplPtrs& tptrs, + const VectorValsT& d_agg, + VectorIndexT& d_indx, + VectorValsT& d_agg_p, + VectorIndexT& d_kout, + VectorValsT& d_vout, + bool sorted = false, + cudaStream_t stream = NULL) +{ + using ValsT = typename VectorValsT::value_type; + auto lamb = [] __host__ __device__ (ValsT x, ValsT y){ + return x+y; + }; + + using ReducerT = decltype(lamb); + + return multi_col_group_by(sz, tptrs, d_agg, + lamb, + d_indx, d_agg_p, d_kout, d_vout, sorted, stream); +} + +//Multi-column group-by MIN: +// +//Input: +//sz = # rows +//tptrs = table as a tuple of columns (pointers to device arrays); +//d_agg = column (device vector) to get aggregated; +//stream = cudaStream to work in; +//Output: +//d_indx = reordering of indices after sorting; +// (passed as argument to avoid allcoations inside the stream) +//d_agg_p = reordering of d_agg after sorting; +// (passed as argument to avoid allcoations inside the stream) +//d_kout = indices of rows after group by; +//d_vout = aggregated values (MINIMIZ-ed) as a result of group-by; +//Return: +//ret = pair of iterators into (d_kout, d_vout), respectively; +// +template +thrust::pair + multi_col_group_by_min(size_t sz, + const TplPtrs& tptrs, + const VectorValsT& d_agg, + VectorIndexT& d_indx, + VectorValsT& d_agg_p, + VectorIndexT& d_kout, + VectorValsT& d_vout, + bool sorted = false, + cudaStream_t stream = NULL) +{ + using ValsT = typename VectorValsT::value_type; + auto lamb = [] __host__ __device__ (ValsT x, ValsT y){ + return (x(sz, tptrs, d_agg, + lamb, + d_indx, d_agg_p, d_kout, d_vout, sorted, stream); +} + +//Multi-column group-by MAX: +// +//Input: +//sz = # rows +//tptrs = table as a tuple of columns (pointers to device arrays); +//d_agg = column (device vector) to get aggregated; +//stream = cudaStream to work in; +//Output: +//d_indx = reordering of indices after sorting; +// (passed as argument to avoid allcoations inside the stream) +//d_agg_p = reordering of d_agg after sorting; +// (passed as argument to avoid allcoations inside the stream) +//d_kout = indices of rows after group by; +//d_vout = aggregated values (MXIMIZ-ed) as a result of group-by; +//Return: +//ret = pair of iterators into (d_kout, d_vout), respectively; +// +template +thrust::pair + multi_col_group_by_max(size_t sz, + const TplPtrs& tptrs, + const VectorValsT& d_agg, + VectorIndexT& d_indx, + VectorValsT& d_agg_p, + VectorIndexT& d_kout, + VectorValsT& d_vout, + bool sorted = false, + cudaStream_t stream = NULL) +{ + using ValsT = typename VectorValsT::value_type; + auto lamb = [] __host__ __device__ (ValsT x, ValsT y){ + return (x>y?x:y); + }; + + using ReducerT = decltype(lamb); + + return multi_col_group_by(sz, tptrs, d_agg, + lamb, + d_indx, d_agg_p, d_kout, d_vout, sorted, stream); +} + +//Multi-column group-by AVERAGE: +// +//Input: +//sz = # rows +//tptrs = table as a tuple of columns (pointers to device arrays); +//d_agg = column (device vector) to get aggregated; +//stream = cudaStream to work in; +//Output: +//d_indx = reordering of indices after sorting; +// (passed as argument to avoid allcoations inside the stream) +//d_cout = (COUNT-ed) values as a result of group-by; +//d_agg_p = reordering of d_agg after sorting; +// (passed as argument to avoid allcoations inside the stream) +//d_kout = indices of rows after group by; +//d_vout = aggregated values (AVERAGE-d) as a result of group-by; +//Return: +//ret = pair of iterators into (d_kout, d_vout), respectively; +// +template +thrust::pair + multi_col_group_by_avg(size_t sz, + const TplPtrs& tptrs, + const VectorValsT& d_agg, + VectorIndexT& d_indx, + VectorIndexT& d_cout, + VectorValsT& d_agg_p, + VectorIndexT& d_kout, + VectorValsT& d_vout, + bool sorted = false, + cudaStream_t stream = NULL) +{ + using IndexT = typename VectorIndexT::value_type; + using ValsT = typename VectorValsT::value_type; + + auto pair_count = multi_col_group_by_count(sz, tptrs, d_indx, d_kout, d_cout, sorted, stream); + + auto pair_sum = multi_col_group_by_sum(sz, tptrs, d_agg, d_indx, d_agg_p, d_kout, d_vout, sorted, stream); + + thrust::transform(thrust::cuda::par.on(stream), + d_cout.begin(), d_cout.end(), + d_vout.begin(), + d_vout.begin(), + [] __host__ __device__ (IndexT n, ValsT sum){ + return sum/static_cast(n); + }); + + return pair_sum; +} + + diff --git a/include/sqls_rtti_comp.hpp b/include/sqls_rtti_comp.hpp new file mode 100644 index 00000000000..ec4334e78b1 --- /dev/null +++ b/include/sqls_rtti_comp.hpp @@ -0,0 +1,712 @@ +/* Copyright 2018 NVIDIA Corporation. All rights reserved. */ + +//Type-erasure C-style interface for Multi-column Filter, Order-By, and Group-By functionality + +#pragma once + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +//for int_t: +// +#include + +///#include "gdf/cffi/types.h" + +template +struct LesserRTTI +{ + // LesserRTTI(const thrust::device_vector& cols, + // const thrust::device_vector& types): + // columns_(cols.data().get()), + // rtti_(types.data().get()), + // sz_(cols.size()) + // { + // assert( types.size() == sz_ ); + // } + + __host__ __device__ + LesserRTTI(void* const* cols, + int* const types, + size_t sz): + columns_(cols), + rtti_(types), + sz_(sz), + vals_(nullptr) + { + } + + __host__ __device__ + LesserRTTI(void* const* cols, + int* const types, + size_t sz, + const void* const * vals): + columns_(cols), + rtti_(types), + sz_(sz), + vals_(vals) + { + } + + + + __host__ __device__ + bool equal(IndexT row1, IndexT row2) const + { + for(int col_index = 0; col_index < sz_; ++col_index) + { + gdf_dtype col_type = static_cast(rtti_[col_index]); + + OpEqual eq(row1, row2); + switch( type_dispatcher(eq, col_type, col_index) ) + { + case State::False: + return false; + + case State::True: + case State::Undecided: + break; + } + } + return true; + } + + __host__ __device__ + bool equal_v(size_t row1) const + { + for(int col_index = 0; col_index < sz_; ++col_index) + { + gdf_dtype col_type = static_cast(rtti_[col_index]); + + OpEqualV eq(vals_, row1); + switch( type_dispatcher(eq, col_type, col_index) ) + { + case State::False: + return false; + + case State::True: + case State::Undecided: + break; + } + } + return true; + } + + __host__ __device__ + bool less(IndexT row1, IndexT row2) const + { + for(int col_index = 0; col_index < sz_; ++col_index) + { + gdf_dtype col_type = static_cast(rtti_[col_index]); + + OpLess less(row1, row2); + switch( type_dispatcher(less, col_type, col_index) ) + { + case State::False: + return false; + case State::True: + return true; + case State::Undecided: + break; + } + } + return false; + } + + __host__ __device__ + void gather(void** d_ppcols_out, size_t* d_indices, size_t nrows_new) const + { + Gatherer g(d_ppcols_out, + d_indices, + nrows_new); + + for(int col_index = 0; col_index < sz_; ++col_index) + { + gdf_dtype col_type = static_cast(rtti_[col_index]); + + type_dispatcher(g, col_type, col_index); + } + } + + template + __host__ __device__ + static ColType at(int col_index, + IndexT row, + const void* const * columns) + { + return (static_cast(columns[col_index]))[row]; + } + +private: + enum class State {False = 0, True = 1, Undecided = 2}; + + struct OpLess + { + __host__ __device__ + OpLess(IndexT row1, IndexT row2): + row1_(row1), + row2_(row2) + { + } + + template + __host__ __device__ + State operator() (int col_index, + const void* const * columns, + ColType ) + { + ColType res1 = LesserRTTI::at(col_index, row1_, columns); + ColType res2 = LesserRTTI::at(col_index, row2_, columns); + + if( res1 < res2 ) + return State::True; + else if( res1 == res2 ) + return State::Undecided; + else + return State::False; + } + private: + IndexT row1_; + IndexT row2_; + }; + + struct OpEqual + { + __host__ __device__ + OpEqual(IndexT row1, IndexT row2): + row1_(row1), + row2_(row2) + { + } + + template + __host__ __device__ + State operator() (int col_index, + const void* const * columns, + ColType ) + { + ColType res1 = LesserRTTI::at(col_index, row1_, columns); + ColType res2 = LesserRTTI::at(col_index, row2_, columns); + + if( res1 != res2 ) + return State::False; + else + return State::Undecided; + } + private: + IndexT row1_; + IndexT row2_; + }; + + struct OpEqualV + { + __host__ __device__ + OpEqualV(const void* const * vals, IndexT row): + target_vals_(vals), + row_(row) + { + } + + template + __host__ __device__ + State operator() (int col_index, + const void* const * columns, + ColType ) + { + ColType res1 = LesserRTTI::at(col_index, row_, columns); + ColType res2 = LesserRTTI::at(col_index, 0, target_vals_); + + if( res1 != res2 ) + return State::False; + else + return State::Undecided; + } + + private: + const void* const * target_vals_; + IndexT row_; + }; + + struct Gatherer + { + __host__ __device__ + Gatherer(void** d_cols_out, + size_t* d_indices, + size_t nrows_new): + d_cols_out_(d_cols_out), + d_indices_(d_indices), + nrows_new_(nrows_new) + { + } + + template + __host__ __device__ + State operator() (int col_index, + const void* const * columns, + ColType ) + { + const ColType* const d_in = static_cast(columns[col_index]); + ColType* d_out = static_cast(d_cols_out_[col_index]); + thrust::gather(thrust::device, + d_indices_, d_indices_ + nrows_new_, //map of indices + d_in, //source + d_out); //=source[map] + + return State::True; + } + private: + void** d_cols_out_; + size_t* d_indices_; + size_t nrows_new_; + }; + + template + __host__ __device__ + State type_dispatcher(Predicate pred, gdf_dtype col_type, int col_index) const + { + switch( col_type ) + { + case GDF_INT8: + { + using ColType = int8_t;//char; + + ColType dummy; + return pred(col_index, columns_, dummy); + } + case GDF_INT16: + { + using ColType = int16_t;//short; + + ColType dummy; + return pred(col_index, columns_, dummy); + } + case GDF_INT32: + { + using ColType = int32_t;//int; + + ColType dummy; + return pred(col_index, columns_, dummy); + } + case GDF_INT64: + { + using ColType = int64_t;//long; + + ColType dummy; + return pred(col_index, columns_, dummy); + } + case GDF_FLOAT32: + { + using ColType = float; + + ColType dummy; + return pred(col_index, columns_, dummy); + } + case GDF_FLOAT64: + { + using ColType = double; + + ColType dummy; + return pred(col_index, columns_, dummy); + } + + default: + assert( false );//type not handled + } + return State::Undecided; + } + + const void* const * columns_; + const int* const rtti_; + size_t sz_; + const void* const * vals_; //for filtering +}; + +//########################################################################### +//# Multi-column ORDER-BY: # +//########################################################################### +//Version with array of columns, +//using type erasure and RTTI at +//comparison operator level; +// +//args: +//Input: +// nrows = # rows; +// ncols = # columns; +// d_cols = device array to ncols type erased columns; +// d_gdf_t = device array to runtime column types; +// stream = cudaStream to work in; +// +//Output: +// d_indx = vector of indices re-ordered after sorting; +// +template +__host__ __device__ +void multi_col_order_by(size_t nrows, + size_t ncols, + void* const* d_cols, + int* const d_gdf_t, + IndexT* d_indx, + cudaStream_t stream = NULL) +{ + LesserRTTI f(d_cols, d_gdf_t, ncols); + + thrust::sequence(thrust::cuda::par.on(stream), d_indx, d_indx+nrows, 0);//cannot use counting_iterator + // 2 reasons: + //(1.) need to return a container result; + //(2.) that container must be mutable; + + thrust::sort(thrust::cuda::par.on(stream), + d_indx, d_indx+nrows, + [f] __host__ __device__ (IndexT i1, IndexT i2){ + return f.less(i1, i2); + }); +} + +//########################################################################### +//# Multi-column Filter: # +//########################################################################### +//Version with array of columns, +//using type erasure and RTTI at +//comparison operator level; +// +//Filter array of columns ("table") +//on given array of values; +//Input: +// nrows = # rows; +// ncols = # columns; +// d_cols = device array to ncols type erased columns; +// d_gdf_t = device array to runtime column types; +// d_vals = tuple of values to filter against; +// stream = cudaStream to work in; +//Output: +// d_flt_indx = device_vector of indx for which a match exists (no holes in array!); +//Return: +// new_sz = new #rows after filtering; +// +template +__host__ __device__ +size_t multi_col_filter(size_t nrows, + size_t ncols, + void* const* d_cols, + int* const d_gdf_t, + void* const* d_vals, + IndexT* ptr_d_flt_indx, + cudaStream_t stream = NULL) +{ + LesserRTTI f(d_cols, d_gdf_t, ncols, d_vals);//size_t, not IndexT, because of counting_iterator below; + + //actual filtering happens here: + // + auto ret_iter_last = + thrust::copy_if(thrust::cuda::par.on(stream), + thrust::make_counting_iterator(0), thrust::make_counting_iterator(nrows), + ptr_d_flt_indx, + [f] __host__ __device__ (size_t indx){ + return f.equal_v(indx); + }); + + size_t new_sz = thrust::distance(ptr_d_flt_indx,ret_iter_last); + + return new_sz; +} + + +//########################################################################### +//# Multi-column Group-By: # +//########################################################################### +//group-by is a reduce_by_key +// +//not only COUNT(*) makes sense with multicolumn: +//one can specify a multi-column GROUP-BY criterion +//and one specifc column to aggregate on; +// +//Input: +// nrows = # rows; +// ncols = # columns; +// d_cols = device array to ncols type erased columns; +// d_gdf_t = device array to runtime column types; +// stream = cudaStream to work in; +//Output: +// d_indx = reordering of indices after sorting; +// (passed as argument to avoid allocations inside the stream) +// d_kout = indices of rows after group by; +// d_vout = aggregated values (COUNT-ed) as a result of group-by; +// d_items = device_vector of items corresponding to indices in d_flt_indx; +//Return: +// ret = # rows after aggregation; +// +template +__host__ __device__ +size_t +multi_col_group_by_count_sort(size_t nrows, + size_t ncols, + void* const* d_cols, + int* const d_gdf_t, + IndexT* ptr_d_indx, + IndexT* ptr_d_kout, + IndexT* ptr_d_vout, + bool sorted = false, + cudaStream_t stream = NULL) +{ + if( !sorted ) + multi_col_order_by(nrows, ncols, d_cols, d_gdf_t, ptr_d_indx, stream); + + LesserRTTI f(d_cols, d_gdf_t, ncols); + + thrust::pair ret = + thrust::reduce_by_key(thrust::cuda::par.on(stream), + ptr_d_indx, ptr_d_indx+nrows, + thrust::make_constant_iterator(1), + ptr_d_kout, + ptr_d_vout, + [f] __host__ __device__(int key1, int key2){ + return f.equal(key1, key2); + }); + + //COUNT(*) for each distinct entry gets collected in d_vout; + //DISTINCT COUNT(*) is just: thrust::distance(d_kout.begin(), ret.first) + + size_t new_sz = thrust::distance(ptr_d_vout, ret.second); + return new_sz; +} + + +//group-by is a reduce_by_key +// +//COUNT(*) isn't the only one that makes sense with multicolumn: +//one can specify a multi-column GROUP-BY criterion +//and one specifc column to aggregate on; +// +//Input: +// nrows = # rows; +// ncols = # columns; +// d_cols = device array to ncols type erased columns; +// d_gdf_t = device array to runtime column types; +// d_agg = column (device vector) to get aggregated; +//fctr = functor to perform the aggregation +//stream = cudaStream to work in; +//Output: +//d_indx = reordering of indices after sorting; +// (passed as argument to avoid allcoations inside the stream) +//d_agg_p = reordering of d_agg after sorting; +// (passed as argument to avoid allcoations inside the stream) +//d_kout = indices of rows after group by; +//d_vout = aggregated values (counted) as a result of group-by; +//Return: +//ret = # rows after aggregation; +// +template +__host__ __device__ +size_t multi_col_group_by_sort(size_t nrows, + size_t ncols, + void* const* d_cols, + int* const d_gdf_t, + const ValsT* ptr_d_agg, + Reducer fctr, + IndexT* ptr_d_indx, + ValsT* ptr_d_agg_p, + IndexT* ptr_d_kout, + ValsT* ptr_d_vout, + bool sorted = false, + cudaStream_t stream = NULL) +{ + if( !sorted ) + multi_col_order_by(nrows, ncols, d_cols, d_gdf_t, ptr_d_indx, stream); + + thrust::gather(thrust::cuda::par.on(stream), + ptr_d_indx, ptr_d_indx + nrows, //map[i] + ptr_d_agg, //source[i] + ptr_d_agg_p); //source[map[i]] + + LesserRTTI f(d_cols, d_gdf_t, ncols); + + thrust::pair ret = + thrust::reduce_by_key(thrust::cuda::par.on(stream), + ptr_d_indx, ptr_d_indx + nrows, + ptr_d_agg_p, + ptr_d_kout, + ptr_d_vout, + [f] __host__ __device__(int key1, int key2){ + return f.equal(key1, key2); + }, + fctr); + + size_t new_sz = thrust::distance(ptr_d_vout, ret.second); + return new_sz; +} + +template +__host__ __device__ +size_t multi_col_group_by_sum_sort(size_t nrows, + size_t ncols, + void* const* d_cols, + int* const d_gdf_t, + const ValsT* ptr_d_agg, + IndexT* ptr_d_indx, + ValsT* ptr_d_agg_p, + IndexT* ptr_d_kout, + ValsT* ptr_d_vout, + bool sorted = false, + cudaStream_t stream = NULL) +{ + auto lamb = [] __host__ __device__ (ValsT x, ValsT y){ + return x+y; + }; + + using ReducerT = decltype(lamb); + + return multi_col_group_by_sort(nrows, + ncols, + d_cols, + d_gdf_t, + ptr_d_agg, + lamb, + ptr_d_indx, + ptr_d_agg_p, + ptr_d_kout, + ptr_d_vout, + sorted, + stream); +} + +template +__host__ __device__ +size_t multi_col_group_by_min_sort(size_t nrows, + size_t ncols, + void* const* d_cols, + int* const d_gdf_t, + const ValsT* ptr_d_agg, + IndexT* ptr_d_indx, + ValsT* ptr_d_agg_p, + IndexT* ptr_d_kout, + ValsT* ptr_d_vout, + bool sorted = false, + cudaStream_t stream = NULL) +{ + auto lamb = [] __host__ __device__ (ValsT x, ValsT y){ + return (x +__host__ __device__ +size_t multi_col_group_by_max_sort(size_t nrows, + size_t ncols, + void* const* d_cols, + int* const d_gdf_t, + const ValsT* ptr_d_agg, + IndexT* ptr_d_indx, + ValsT* ptr_d_agg_p, + IndexT* ptr_d_kout, + ValsT* ptr_d_vout, + bool sorted = false, + cudaStream_t stream = NULL) +{ + auto lamb = [] __host__ __device__ (ValsT x, ValsT y){ + return (x>y?x:y); + }; + + using ReducerT = decltype(lamb); + + return multi_col_group_by_sort(nrows, + ncols, + d_cols, + d_gdf_t, + ptr_d_agg, + lamb, + ptr_d_indx, + ptr_d_agg_p, + ptr_d_kout, + ptr_d_vout, + sorted, + stream); +} + + +template +__host__ __device__ +size_t multi_col_group_by_avg_sort(size_t nrows, + size_t ncols, + void* const* d_cols, + int* const d_gdf_t, + const ValsT* ptr_d_agg, + IndexT* ptr_d_indx, + IndexT* ptr_d_cout, + ValsT* ptr_d_agg_p, + IndexT* ptr_d_kout, + ValsT* ptr_d_vout, + bool sorted = false, + cudaStream_t stream = NULL) +{ + multi_col_group_by_count_sort(nrows, + ncols, + d_cols, + d_gdf_t, + ptr_d_indx, + ptr_d_kout, + ptr_d_cout, + sorted, + stream); + + size_t new_sz = multi_col_group_by_sum_sort(nrows, + ncols, + d_cols, + d_gdf_t, + ptr_d_agg, + ptr_d_indx, + ptr_d_agg_p, + ptr_d_kout, + ptr_d_vout, + sorted, + stream); + + thrust::transform(thrust::cuda::par.on(stream), + ptr_d_cout, ptr_d_cout + nrows, + ptr_d_vout, + ptr_d_vout, + [] __host__ __device__ (IndexT n, ValsT sum){ + return sum/static_cast(n); + }); + + return new_sz; +} + + diff --git a/src/sqls_ops.cu b/src/sqls_ops.cu new file mode 100644 index 00000000000..4ba654fdafd --- /dev/null +++ b/src/sqls_ops.cu @@ -0,0 +1,1280 @@ +/* Copyright 2018 NVIDIA Corporation. All rights reserved. */ + +//Type-erasure C-style interface for Multi-column Filter, Order-By, and Group-By functionality + +#include +#include +#include + +///#include "../include/sqls_rtti_comp.hpp" -- CORRECT: put me back +#include "sqls_rtti_comp.hpp" + +//using IndexT = int;//okay... +using IndexT = size_t; + +namespace{ //annonymus + + //helper functions: + // + //flatten AOS info from gdf_columns into SOA (2 arrays): + //(1) column array pointers and (2) types; + // + void soa_col_info(gdf_column* cols, size_t ncols, void** d_cols, int* d_types) + { + std::vector v_cols(ncols,nullptr); + std::vector v_types(ncols, 0); + for(int i=0;i + using Vector = thrust::device_vector; + + void type_dispatcher(gdf_dtype col_type, + int col_index, + gdf_column** h_cols_in, + gdf_column** h_cols_out, + IndexT* d_indices, + size_t nrows_new) + { + switch( col_type ) + { + case GDF_INT8: + { + using ColType = int8_t; + + ColType* d_in = static_cast(h_cols_in[col_index]->data);//pointer semantics (2) + ColType* d_out = static_cast(h_cols_out[col_index]->data); + thrust::gather(thrust::device, + d_indices, d_indices + nrows_new, //map of indices + d_in, //source + d_out); //=source[map] + break; + } + case GDF_INT16: + { + using ColType = int16_t; + + ColType* d_in = static_cast(h_cols_in[col_index]->data); + ColType* d_out = static_cast(h_cols_out[col_index]->data); + thrust::gather(thrust::device, + d_indices, d_indices + nrows_new, //map of indices + d_in, //source + d_out); //=source[map] + break; + } + case GDF_INT32: + { + using ColType = int32_t; + + ColType* d_in = static_cast(h_cols_in[col_index]->data); + ColType* d_out = static_cast(h_cols_out[col_index]->data); + thrust::gather(thrust::device, + d_indices, d_indices + nrows_new, //map of indices + d_in, //source + d_out); //=source[map] + break; + } + case GDF_INT64: + { + using ColType = int64_t; + + ColType* d_in = static_cast(h_cols_in[col_index]->data); + ColType* d_out = static_cast(h_cols_out[col_index]->data); + thrust::gather(thrust::device, + d_indices, d_indices + nrows_new, //map of indices + d_in, //source + d_out); //=source[map] + break; + } + case GDF_FLOAT32: + { + using ColType = float; + + ColType* d_in = static_cast(h_cols_in[col_index]->data); + ColType* d_out = static_cast(h_cols_out[col_index]->data); + thrust::gather(thrust::device, + d_indices, d_indices + nrows_new, //map of indices + d_in, //source + d_out); //=source[map] + break; + } + case GDF_FLOAT64: + { + using ColType = double; + + ColType* d_in = static_cast(h_cols_in[col_index]->data); + ColType* d_out = static_cast(h_cols_out[col_index]->data); + thrust::gather(thrust::device, + d_indices, d_indices + nrows_new, //map of indices + d_in, //source + d_out); //=source[map] + break; + } + + default: + assert( false );//type not handled + } + return;// State::True; + } + + //copy from a set of gdf_columns: h_cols_in + //of size (#ncols): ncols + //to another set of columns : h_cols_out + //by gathering via array of indices: d_indices + //of size: nrows_new + // + void multi_gather_host(size_t ncols, gdf_column** h_cols_in, gdf_column** h_cols_out, IndexT* d_indices, size_t nrows_new) + { + for(int col_index = 0; col_indexdtype; + type_dispatcher(col_type, + col_index, + h_cols_in, + h_cols_out, + d_indices, + nrows_new); + + h_cols_out[col_index]->dtype = col_type; + h_cols_out[col_index]->size = nrows_new; + + //TODO: h_cols_out[col_index]->valid + } + } + + int dtype_size(gdf_dtype col_type) + { + switch( col_type ) + { + case GDF_INT8: + { + using ColType = int8_t; + + return sizeof(ColType); + } + case GDF_INT16: + { + using ColType = int16_t; + + return sizeof(ColType); + } + case GDF_INT32: + { + using ColType = int32_t; + + return sizeof(ColType); + } + case GDF_INT64: + { + using ColType = int64_t; + + return sizeof(ColType); + } + case GDF_FLOAT32: + { + using ColType = float; + + return sizeof(ColType); + } + case GDF_FLOAT64: + { + using ColType = double; + + return sizeof(ColType); + } + + default: + assert( false );//type not handled + } + } + +#ifdef DEBUG_ + void run_echo(size_t nrows, //in: # rows + gdf_column* cols, //in: host-side array of gdf_columns + size_t ncols, //in: # cols + int flag_sorted, //in: flag specifying if rows are pre-sorted (1) or not (0) + gdf_column agg_in)//in: column to aggregate + { + std::cout<<"############# Echo: #############\n"; + std::cout<<"nrows: "< v(nrows); + int32_t* p = &v[0]; + cudaMemcpy(p, cols[i].data, nrows*sizeof(int32_t), cudaMemcpyDeviceToHost); + std::copy(v.begin(), v.end(), std::ostream_iterator(std::cout,",")); + std::cout<<"\n"; + break; + } + case 2: + { + std::vector v(nrows); + double* p = &v[0]; + cudaMemcpy(p, cols[i].data, nrows*sizeof(double), cudaMemcpyDeviceToHost); + std::copy(v.begin(), v.end(), std::ostream_iterator(std::cout,",")); + std::cout<<"\n"; + break; + } + } + } + + + std::cout<<"col to aggregate on:\n"; + std::vector v(nrows); + double* p = &v[0]; + cudaMemcpy(p, agg_in.data, nrows*sizeof(double), cudaMemcpyDeviceToHost); + std::copy(v.begin(), v.end(), std::ostream_iterator(std::cout,",")); + std::cout<<"\n"; + } +#endif + + + + + + +//apparent duplication of info between +//gdf_column array and two arrays: +// d_cols = data slice of gdf_column array; +// d_types = dtype slice of gdf_column array; +//but it's nevessary because the gdf_column array is host +//(even though its data slice is on device) +// +gdf_error gdf_group_by_count(size_t nrows, //in: # rows + gdf_column* cols, //in: host-side array of gdf_columns + size_t ncols, //in: # cols + int flag_sorted, //in: flag specififying if rows are pre-sorted (1) or not (0) + void** d_cols, //out: pre-allocated device-side array to be filled with gdf_column::data for each column; slicing of gdf_column array (host) + int* d_types, //out: pre-allocated device-side array to be filled with gdf_colum::dtype for each column; slicing of gdf_column array (host) + IndexT* d_indx, //out: device-side array of row indices after sorting + IndexT* d_kout, //out: device-side array of rows after gropu-by + IndexT* d_count, //out: device-side array of aggregated values (COUNT-ed) as a result of group-by; + size_t* new_sz) //out: host-side # rows of d_count +{ + //copy H-D: + // + soa_col_info(cols, ncols, d_cols, d_types); + + *new_sz = multi_col_group_by_count_sort(nrows, + ncols, + d_cols, + d_types, + d_indx, + d_kout, + d_count, + flag_sorted); + + + return GDF_SUCCESS; +} + +//apparent duplication of info between +//gdf_column array and two arrays: +// d_cols = data slice of gdf_column array; +// d_types = dtype slice of gdf_column array; +//but it's necessary because the gdf_column array is host +//(even though its data slice is on device) +// +gdf_error gdf_group_by_sum(size_t nrows, //in: # rows + gdf_column* cols, //in: host-side array of gdf_columns + size_t ncols, //in: # cols + int flag_sorted, //in: flag specififying if rows are pre-sorted (1) or not (0) + gdf_column& agg_in,//in: column to aggregate + void** d_cols, //out: pre-allocated device-side array to be filled with gdf_column::data for each column; slicing of gdf_column array (host) + int* d_types, //out: pre-allocated device-side array to be filled with gdf_colum::dtype for each column; slicing of gdf_column array (host) + IndexT* d_indx, //out: device-side array of row indices after sorting + gdf_column& agg_p, //out: reordering of d_agg after sorting; requires shallow (trivial) copy-construction (see static_assert below); + IndexT* d_kout, //out: device-side array of rows after group-by + gdf_column& c_vout,//out: aggregated column; requires shallow (trivial) copy-construction (see static_assert below); + size_t* new_sz) //out: host-side # rows of d_count +{ + //not supported by g++-4.8: + // + //static_assert(std::is_trivially_copy_constructible::value, + // "error: gdf_column must have shallow copy constructor; otherwise cannot pass output by copy."); + +#ifdef DEBUG_ + run_echo(nrows, //in: # rows + cols, //in: host-side array of gdf_columns + ncols, //in: # cols + flag_sorted, //in: flag specififying if rows are pre-sorted (1) or not (0) + agg_in);//in: column to aggregate +#endif + + assert( agg_in.dtype == agg_p.dtype ); + assert( agg_in.dtype == c_vout.dtype ); + + //copy H-D: + // + soa_col_info(cols, ncols, d_cols, d_types); + + switch( agg_in.dtype ) + { + case GDF_INT8: + { + using T = char; + + T* d_agg = static_cast(agg_in.data); + T* d_agg_p = static_cast(agg_p.data); + T* d_vout = static_cast(c_vout.data); + *new_sz = multi_col_group_by_sum_sort(nrows, + ncols, + d_cols, + d_types, + d_agg, + d_indx, + d_agg_p, + d_kout, + d_vout, + flag_sorted); + + break; + } + + case GDF_INT16: + { + using T = short; + + T* d_agg = static_cast(agg_in.data); + T* d_agg_p = static_cast(agg_p.data); + T* d_vout = static_cast(c_vout.data); + *new_sz = multi_col_group_by_sum_sort(nrows, + ncols, + d_cols, + d_types, + d_agg, + d_indx, + d_agg_p, + d_kout, + d_vout, + flag_sorted); + + break; + } + case GDF_INT32: + { + using T = int; + + T* d_agg = static_cast(agg_in.data); + T* d_agg_p = static_cast(agg_p.data); + T* d_vout = static_cast(c_vout.data); + *new_sz = multi_col_group_by_sum_sort(nrows, + ncols, + d_cols, + d_types, + d_agg, + d_indx, + d_agg_p, + d_kout, + d_vout, + flag_sorted); + + break; + } + + case GDF_INT64: + { + using T = long; + + T* d_agg = static_cast(agg_in.data); + T* d_agg_p = static_cast(agg_p.data); + T* d_vout = static_cast(c_vout.data); + *new_sz = multi_col_group_by_sum_sort(nrows, + ncols, + d_cols, + d_types, + d_agg, + d_indx, + d_agg_p, + d_kout, + d_vout, + flag_sorted); + + break; + } + + case GDF_FLOAT32: + { + using T = float; + + T* d_agg = static_cast(agg_in.data); + T* d_agg_p = static_cast(agg_p.data); + T* d_vout = static_cast(c_vout.data); + *new_sz = multi_col_group_by_sum_sort(nrows, + ncols, + d_cols, + d_types, + d_agg, + d_indx, + d_agg_p, + d_kout, + d_vout, + flag_sorted); + + break; + } + + case GDF_FLOAT64: + { + using T = double; + + T* d_agg = static_cast(agg_in.data); + T* d_agg_p = static_cast(agg_p.data); + T* d_vout = static_cast(c_vout.data); + *new_sz = multi_col_group_by_sum_sort(nrows, + ncols, + d_cols, + d_types, + d_agg, + d_indx, + d_agg_p, + d_kout, + d_vout, + flag_sorted); + + break; + } + + default: + return GDF_UNSUPPORTED_DTYPE; + } + + return GDF_SUCCESS; +} + + +//apparent duplication of info between +//gdf_column array and two arrays: +// d_cols = data slice of gdf_column array; +// d_types = dtype slice of gdf_column array; +//but it's necessary because the gdf_column array is host +//(even though its data slice is on device) +// +gdf_error gdf_group_by_min(size_t nrows, //in: # rows + gdf_column* cols, //in: host-side array of gdf_columns + size_t ncols, //in: # cols + int flag_sorted, //in: flag specififying if rows are pre-sorted (1) or not (0) + gdf_column& agg_in,//in: column to aggregate + void** d_cols, //out: pre-allocated device-side array to be filled with gdf_column::data for each column; slicing of gdf_column array (host) + int* d_types, //out: pre-allocated device-side array to be filled with gdf_colum::dtype for each column; slicing of gdf_column array (host) + IndexT* d_indx, //out: device-side array of row indices after sorting + gdf_column& agg_p, //out: reordering of d_agg after sorting; requires shallow (trivial) copy-construction (see static_assert below); + IndexT* d_kout, //out: device-side array of rows after gropu-by + gdf_column& c_vout,//out: aggregated column; requires shallow (trivial) copy-construction (see static_assert below); + size_t* new_sz) //out: host-side # rows of d_count +{ + //not supported by g++-4.8: + // + //static_assert(std::is_trivially_copy_constructible::value, + // "error: gdf_column must have shallow copy constructor; otherwise cannot pass output by copy."); + + assert( agg_in.dtype == agg_p.dtype ); + assert( agg_in.dtype == c_vout.dtype ); + + //copy H-D: + // + soa_col_info(cols, ncols, d_cols, d_types); + + switch( agg_in.dtype ) + { + case GDF_INT8: + { + using T = char; + + T* d_agg = static_cast(agg_in.data); + T* d_agg_p = static_cast(agg_p.data); + T* d_vout = static_cast(c_vout.data); + *new_sz = multi_col_group_by_min_sort(nrows, + ncols, + d_cols, + d_types, + d_agg, + d_indx, + d_agg_p, + d_kout, + d_vout, + flag_sorted); + + break; + } + + case GDF_INT16: + { + using T = short; + + T* d_agg = static_cast(agg_in.data); + T* d_agg_p = static_cast(agg_p.data); + T* d_vout = static_cast(c_vout.data); + *new_sz = multi_col_group_by_min_sort(nrows, + ncols, + d_cols, + d_types, + d_agg, + d_indx, + d_agg_p, + d_kout, + d_vout, + flag_sorted); + + break; + } + case GDF_INT32: + { + using T = int; + + T* d_agg = static_cast(agg_in.data); + T* d_agg_p = static_cast(agg_p.data); + T* d_vout = static_cast(c_vout.data); + *new_sz = multi_col_group_by_min_sort(nrows, + ncols, + d_cols, + d_types, + d_agg, + d_indx, + d_agg_p, + d_kout, + d_vout, + flag_sorted); + + break; + } + + case GDF_INT64: + { + using T = long; + + T* d_agg = static_cast(agg_in.data); + T* d_agg_p = static_cast(agg_p.data); + T* d_vout = static_cast(c_vout.data); + *new_sz = multi_col_group_by_min_sort(nrows, + ncols, + d_cols, + d_types, + d_agg, + d_indx, + d_agg_p, + d_kout, + d_vout, + flag_sorted); + + break; + } + + case GDF_FLOAT32: + { + using T = float; + + T* d_agg = static_cast(agg_in.data); + T* d_agg_p = static_cast(agg_p.data); + T* d_vout = static_cast(c_vout.data); + *new_sz = multi_col_group_by_min_sort(nrows, + ncols, + d_cols, + d_types, + d_agg, + d_indx, + d_agg_p, + d_kout, + d_vout, + flag_sorted); + + break; + } + + case GDF_FLOAT64: + { + using T = double; + + T* d_agg = static_cast(agg_in.data); + T* d_agg_p = static_cast(agg_p.data); + T* d_vout = static_cast(c_vout.data); + *new_sz = multi_col_group_by_min_sort(nrows, + ncols, + d_cols, + d_types, + d_agg, + d_indx, + d_agg_p, + d_kout, + d_vout, + flag_sorted); + + break; + } + + default: + return GDF_UNSUPPORTED_DTYPE; + } + + return GDF_SUCCESS; +} + + +//apparent duplication of info between +//gdf_column array and two arrays: +// d_cols = data slice of gdf_column array; +// d_types = dtype slice of gdf_column array; +//but it's necessary because the gdf_column array is host +//(even though its data slice is on device) +// +gdf_error gdf_group_by_max(size_t nrows, //in: # rows + gdf_column* cols, //in: host-side array of gdf_columns + size_t ncols, //in: # cols + int flag_sorted, //in: flag specififying if rows are pre-sorted (1) or not (0) + gdf_column& agg_in,//in: column to aggregate + void** d_cols, //out: pre-allocated device-side array to be filled with gdf_column::data for each column; slicing of gdf_column array (host) + int* d_types, //out: pre-allocated device-side array to be filled with gdf_colum::dtype for each column; slicing of gdf_column array (host) + IndexT* d_indx, //out: device-side array of row indices after sorting + gdf_column& agg_p, //out: reordering of d_agg after sorting; requires shallow (trivial) copy-construction (see static_assert below); + IndexT* d_kout, //out: device-side array of rows after gropu-by + gdf_column& c_vout,//out: aggregated column; requires shallow (trivial) copy-construction (see static_assert below); + size_t* new_sz) //out: host-side # rows of d_count +{ + //not supported by g++-4.8: + // + //static_assert(std::is_trivially_copy_constructible::value, + // "error: gdf_column must have shallow copy constructor; otherwise cannot pass output by copy."); + + assert( agg_in.dtype == agg_p.dtype ); + assert( agg_in.dtype == c_vout.dtype ); + + //copy H-D: + // + soa_col_info(cols, ncols, d_cols, d_types); + + switch( agg_in.dtype ) + { + case GDF_INT8: + { + using T = char; + + T* d_agg = static_cast(agg_in.data); + T* d_agg_p = static_cast(agg_p.data); + T* d_vout = static_cast(c_vout.data); + *new_sz = multi_col_group_by_max_sort(nrows, + ncols, + d_cols, + d_types, + d_agg, + d_indx, + d_agg_p, + d_kout, + d_vout, + flag_sorted); + + break; + } + + case GDF_INT16: + { + using T = short; + + T* d_agg = static_cast(agg_in.data); + T* d_agg_p = static_cast(agg_p.data); + T* d_vout = static_cast(c_vout.data); + *new_sz = multi_col_group_by_max_sort(nrows, + ncols, + d_cols, + d_types, + d_agg, + d_indx, + d_agg_p, + d_kout, + d_vout, + flag_sorted); + + break; + } + case GDF_INT32: + { + using T = int; + + T* d_agg = static_cast(agg_in.data); + T* d_agg_p = static_cast(agg_p.data); + T* d_vout = static_cast(c_vout.data); + *new_sz = multi_col_group_by_max_sort(nrows, + ncols, + d_cols, + d_types, + d_agg, + d_indx, + d_agg_p, + d_kout, + d_vout, + flag_sorted); + + break; + } + + case GDF_INT64: + { + using T = long; + + T* d_agg = static_cast(agg_in.data); + T* d_agg_p = static_cast(agg_p.data); + T* d_vout = static_cast(c_vout.data); + *new_sz = multi_col_group_by_max_sort(nrows, + ncols, + d_cols, + d_types, + d_agg, + d_indx, + d_agg_p, + d_kout, + d_vout, + flag_sorted); + + break; + } + + case GDF_FLOAT32: + { + using T = float; + + T* d_agg = static_cast(agg_in.data); + T* d_agg_p = static_cast(agg_p.data); + T* d_vout = static_cast(c_vout.data); + *new_sz = multi_col_group_by_max_sort(nrows, + ncols, + d_cols, + d_types, + d_agg, + d_indx, + d_agg_p, + d_kout, + d_vout, + flag_sorted); + + break; + } + + case GDF_FLOAT64: + { + using T = double; + + T* d_agg = static_cast(agg_in.data); + T* d_agg_p = static_cast(agg_p.data); + T* d_vout = static_cast(c_vout.data); + *new_sz = multi_col_group_by_max_sort(nrows, + ncols, + d_cols, + d_types, + d_agg, + d_indx, + d_agg_p, + d_kout, + d_vout, + flag_sorted); + + break; + } + + default: + return GDF_UNSUPPORTED_DTYPE; + } + + return GDF_SUCCESS; +} + +//apparent duplication of info between +//gdf_column array and two arrays: +// d_cols = data slice of gdf_column array; +// d_types = dtype slice of gdf_column array; +//but it's necessary because the gdf_column array is host +//(even though its data slice is on device) +// +gdf_error gdf_group_by_avg(size_t nrows, //in: # rows + gdf_column* cols, //in: host-side array of gdf_columns + size_t ncols, //in: # cols + int flag_sorted, //in: flag specififying if rows are pre-sorted (1) or not (0) + gdf_column& agg_in,//in: column to aggregate + void** d_cols, //out: pre-allocated device-side array to be filled with gdf_column::data for each column; slicing of gdf_column array (host) + int* d_types, //out: pre-allocated device-side array to be filled with gdf_colum::dtype for each column; slicing of gdf_column array (host) + IndexT* d_indx, //out: device-side array of row indices after sorting + IndexT* d_cout, //out: device-side array of (COUNT-ed) values as a result of group-by; + gdf_column& agg_p, //out: reordering of d_agg after sorting; requires shallow (trivial) copy-construction (see static_assert below); + IndexT* d_kout, //out: device-side array of rows after gropu-by + gdf_column& c_vout,//out: aggregated column; requires shallow (trivial) copy-construction (see static_assert below); + size_t* new_sz) //out: host-side # rows of d_count +{ + //not supported by g++-4.8: + // + //static_assert(std::is_trivially_copy_constructible::value, + // "error: gdf_column must have shallow copy constructor; otherwise cannot pass output by copy."); + + assert( agg_in.dtype == agg_p.dtype ); + assert( agg_in.dtype == c_vout.dtype ); + + //copy H-D: + // + soa_col_info(cols, ncols, d_cols, d_types); + + switch( agg_in.dtype ) + { + case GDF_INT8: + { + using T = char; + + T* d_agg = static_cast(agg_in.data); + T* d_agg_p = static_cast(agg_p.data); + T* d_vout = static_cast(c_vout.data); + *new_sz = multi_col_group_by_avg_sort(nrows, + ncols, + d_cols, + d_types, + d_agg, + d_indx, + d_cout, + d_agg_p, + d_kout, + d_vout, + flag_sorted); + + break; + } + + case GDF_INT16: + { + using T = short; + + T* d_agg = static_cast(agg_in.data); + T* d_agg_p = static_cast(agg_p.data); + T* d_vout = static_cast(c_vout.data); + *new_sz = multi_col_group_by_avg_sort(nrows, + ncols, + d_cols, + d_types, + d_agg, + d_indx, + d_cout, + d_agg_p, + d_kout, + d_vout, + flag_sorted); + + break; + } + case GDF_INT32: + { + using T = int; + + T* d_agg = static_cast(agg_in.data); + T* d_agg_p = static_cast(agg_p.data); + T* d_vout = static_cast(c_vout.data); + *new_sz = multi_col_group_by_avg_sort(nrows, + ncols, + d_cols, + d_types, + d_agg, + d_indx, + d_cout, + d_agg_p, + d_kout, + d_vout, + flag_sorted); + + break; + } + + case GDF_INT64: + { + using T = long; + + T* d_agg = static_cast(agg_in.data); + T* d_agg_p = static_cast(agg_p.data); + T* d_vout = static_cast(c_vout.data); + *new_sz = multi_col_group_by_avg_sort(nrows, + ncols, + d_cols, + d_types, + d_agg, + d_indx, + d_cout, + d_agg_p, + d_kout, + d_vout, + flag_sorted); + + break; + } + + case GDF_FLOAT32: + { + using T = float; + + T* d_agg = static_cast(agg_in.data); + T* d_agg_p = static_cast(agg_p.data); + T* d_vout = static_cast(c_vout.data); + *new_sz = multi_col_group_by_avg_sort(nrows, + ncols, + d_cols, + d_types, + d_agg, + d_indx, + d_cout, + d_agg_p, + d_kout, + d_vout, + flag_sorted); + + break; + } + + case GDF_FLOAT64: + { + using T = double; + + T* d_agg = static_cast(agg_in.data); + T* d_agg_p = static_cast(agg_p.data); + T* d_vout = static_cast(c_vout.data); + *new_sz = multi_col_group_by_avg_sort(nrows, + ncols, + d_cols, + d_types, + d_agg, + d_indx, + d_cout, + d_agg_p, + d_kout, + d_vout, + flag_sorted); + + break; + } + + default: + return GDF_UNSUPPORTED_DTYPE; + } + + return GDF_SUCCESS; +} + +gdf_error gdf_group_by_single(int ncols, // # columns + gdf_column** cols, //input cols + gdf_column* col_agg, //column to aggregate on + gdf_column* out_col_indices, //if not null return indices of re-ordered rows + gdf_column** out_col_values, //if not null return the grouped-by columns + //(multi-gather based on indices, which are needed anyway) + gdf_column* out_col_agg, //aggregation result + gdf_context* ctxt, //struct with additional info: bool is_sorted, flag_sort_or_hash, bool flag_count_distinct + gdf_agg_op op) //aggregation operation +{ + if( ncols == 0 ) + return GDF_DATASET_EMPTY; + + if( ctxt->flag_method == GDF_SORT ) + { + std::vector v_cols(ncols); + for(auto i = 0; i < ncols; ++i) + { + v_cols[i] = *(cols[i]); + } + + gdf_column* h_columns = &v_cols[0]; + size_t nrows = h_columns[0].size; + + if( nrows == 0 ) + return GDF_DATASET_EMPTY; + + size_t n_group = 0; + + Vector d_indx;//allocate only if necessary (see below) + Vector d_cols(ncols, nullptr); + Vector d_types(ncols, 0); + + void** d_col_data = d_cols.data().get(); + int* d_col_types = d_types.data().get(); + + IndexT* ptr_d_indx = nullptr; + if( out_col_indices ) + ptr_d_indx = static_cast(out_col_indices->data); + else + { + d_indx.resize(nrows); + ptr_d_indx = d_indx.data().get(); + } + + Vector d_sort(nrows, 0); + IndexT* ptr_d_sort = d_sort.data().get(); + + gdf_column c_agg_p; + c_agg_p.dtype = col_agg->dtype; + c_agg_p.size = nrows; + Vector d_agg_p(nrows * dtype_size(c_agg_p.dtype));//this might be PROBLEMatic (seems harmless) + c_agg_p.data = d_agg_p.data().get(); + + switch( op ) + { + case GDF_SUM: + gdf_group_by_sum(nrows, + h_columns, + static_cast(ncols), + ctxt->flag_sorted, + *col_agg, + d_col_data, //allocated + d_col_types,//allocated + ptr_d_sort, //allocated + c_agg_p, //allocated + ptr_d_indx, //allocated (or, passed in) + *out_col_agg, + &n_group); + break; + + case GDF_MIN: + gdf_group_by_min(nrows, + h_columns, + static_cast(ncols), + ctxt->flag_sorted, + *col_agg, + d_col_data, //allocated + d_col_types,//allocated + ptr_d_sort, //allocated + c_agg_p, //allocated + ptr_d_indx, //allocated (or, passed in) + *out_col_agg, + &n_group); + break; + + case GDF_MAX: + gdf_group_by_max(nrows, + h_columns, + static_cast(ncols), + ctxt->flag_sorted, + *col_agg, + d_col_data, //allocated + d_col_types,//allocated + ptr_d_sort, //allocated + c_agg_p, //allocated + ptr_d_indx, //allocated (or, passed in) + *out_col_agg, + &n_group); + break; + + case GDF_AVG: + { + Vector d_cout(nrows, 0); + IndexT* ptr_d_cout = d_cout.data().get(); + + gdf_group_by_avg(nrows, + h_columns, + static_cast(ncols), + ctxt->flag_sorted, + *col_agg, + d_col_data, //allocated + d_col_types,//allocated + ptr_d_sort, //allocated + ptr_d_cout, //allocated + c_agg_p, //allocated + ptr_d_indx, //allocated (or, passed in) + *out_col_agg, + &n_group); + } + break; + case GDF_COUNT_DISTINCT: + { + assert( out_col_agg ); + //assert( out_col_agg->dtype == GDF_INT64 );//==size_t ????? + assert( out_col_agg->size >= 1); + + Vector d_counts(nrows, 0); + + IndexT* ptr_d_vals = d_counts.data().get(); + gdf_group_by_count(nrows, + h_columns, + static_cast(ncols), + ctxt->flag_sorted, + d_col_data, //allocated + d_col_types,//allocated + ptr_d_sort, //allocated + ptr_d_indx, //allocated (or, passed in) + ptr_d_vals, //passed in + &n_group); + IndexT* p_out = static_cast(out_col_agg->data); + p_out[0] = static_cast(n_group); + } + break; + case GDF_COUNT: + { + assert( out_col_agg ); + //assert( out_col_agg->dtype == GDF_INT64 );//==size_t ????? + + IndexT* ptr_d_vals = static_cast(out_col_agg->data); + gdf_group_by_count(nrows, + h_columns, + static_cast(ncols), + ctxt->flag_sorted, + d_col_data, //allocated + d_col_types,//allocated + ptr_d_sort, //allocated + ptr_d_indx, //allocated (or, passed in) + ptr_d_vals, //passed in + &n_group); + } + break; + } + + if( out_col_values ) + { + multi_gather_host(ncols, cols, out_col_values, ptr_d_indx, n_group); + } + + out_col_agg->size = n_group; + if( out_col_indices ) + out_col_indices->size = n_group; + + //TODO: out_->valid = ????? + } + else if( ctxt->flag_method == GDF_HASH ) + { + //TODO: + //HASH-based + } + else + { + return GDF_UNSUPPORTED_METHOD; + } + + return GDF_SUCCESS; +} +}//end unknown namespace + +//apparent duplication of info between +//gdf_column array and two arrays: +// d_cols = data slice of gdf_column array; +// d_types = dtype slice of gdf_column array; +//but it's nevessary because the gdf_column array is host +//(even though its data slice is on device) +// +gdf_error gdf_order_by(size_t nrows, //in: # rows + gdf_column* cols, //in: host-side array of gdf_columns + size_t ncols, //in: # cols + void** d_cols, //out: pre-allocated device-side array to be filled with gdf_column::data for each column; slicing of gdf_column array (host) + int* d_types, //out: pre-allocated device-side array to be filled with gdf_colum::dtype for each column; slicing of gdf_column array (host) + size_t* d_indx) //out: device-side array of re-rdered row indices +{ + //copy H-D: + // + soa_col_info(cols, ncols, d_cols, d_types); + + multi_col_order_by(nrows, + ncols, + d_cols, + d_types, + d_indx); + + return GDF_SUCCESS; +} + +//apparent duplication of info between +//gdf_column array and two arrays: +// d_cols = data slice of gdf_column array; +// d_types = dtype slice of gdf_column array; +//but it's nevessary because the gdf_column array is host +//(even though its data slice is on device) +// +gdf_error gdf_filter(size_t nrows, //in: # rows + gdf_column* cols, //in: host-side array of gdf_columns + size_t ncols, //in: # cols + void** d_cols, //out: pre-allocated device-side array to be filled with gdf_column::data for each column; slicing of gdf_column array (host) + int* d_types, //out: pre-allocated device-side array to be filled with gdf_colum::dtype for each column; slicing of gdf_column array (host) + void** d_vals, //in: device-side array of values to filter against (type-erased) + size_t* d_indx, //out: device-side array of row indices that remain after filtering + size_t* new_sz) //out: host-side # rows that remain after filtering +{ + //copy H-D: + // + soa_col_info(cols, ncols, d_cols, d_types); + + *new_sz = multi_col_filter(nrows, + ncols, + d_cols, + d_types, + d_vals, + d_indx); + + + return GDF_SUCCESS; +} + +gdf_error gdf_group_by_sum(int ncols, // # columns + gdf_column** cols, //input cols + gdf_column* col_agg, //column to aggregate on + gdf_column* out_col_indices, //if not null return indices of re-ordered rows + gdf_column** out_col_values, //if not null return the grouped-by columns + //(multi-gather based on indices, which are needed anyway) + gdf_column* out_col_agg, //aggregation result + gdf_context* ctxt) //struct with additional info: bool is_sorted, flag_sort_or_hash, bool flag_count_distinct +{ + return gdf_group_by_single(ncols, cols, col_agg, out_col_indices, out_col_values, out_col_agg, ctxt, GDF_SUM); +} + +gdf_error gdf_group_by_min(int ncols, // # columns + gdf_column** cols, //input cols + gdf_column* col_agg, //column to aggregate on + gdf_column* out_col_indices, //if not null return indices of re-ordered rows + gdf_column** out_col_values, //if not null return the grouped-by columns + //(multi-gather based on indices, which are needed anyway) + gdf_column* out_col_agg, //aggregation result + gdf_context* ctxt) //struct with additional info: bool is_sorted, flag_sort_or_hash, bool flag_count_distinct +{ + return gdf_group_by_single(ncols, cols, col_agg, out_col_indices, out_col_values, out_col_agg, ctxt, GDF_MIN); +} + +gdf_error gdf_group_by_max(int ncols, // # columns + gdf_column** cols, //input cols + gdf_column* col_agg, //column to aggregate on + gdf_column* out_col_indices, //if not null return indices of re-ordered rows + gdf_column** out_col_values, //if not null return the grouped-by columns + //(multi-gather based on indices, which are needed anyway) + gdf_column* out_col_agg, //aggregation result + gdf_context* ctxt) //struct with additional info: bool is_sorted, flag_sort_or_hash, bool flag_count_distinct +{ + return gdf_group_by_single(ncols, cols, col_agg, out_col_indices, out_col_values, out_col_agg, ctxt, GDF_MAX); +} + +gdf_error gdf_group_by_avg(int ncols, // # columns + gdf_column** cols, //input cols + gdf_column* col_agg, //column to aggregate on + gdf_column* out_col_indices, //if not null return indices of re-ordered rows + gdf_column** out_col_values, //if not null return the grouped-by columns + //(multi-gather based on indices, which are needed anyway) + gdf_column* out_col_agg, //aggregation result + gdf_context* ctxt) //struct with additional info: bool is_sorted, flag_sort_or_hash, bool flag_count_distinct +{ + return gdf_group_by_single(ncols, cols, col_agg, out_col_indices, out_col_values, out_col_agg, ctxt, GDF_AVG); +} + +gdf_error gdf_group_by_count(int ncols, // # columns + gdf_column** cols, //input cols + gdf_column* col_agg, //column to aggregate on + gdf_column* out_col_indices, //if not null return indices of re-ordered rows + gdf_column** out_col_values, //if not null return the grouped-by columns + //(multi-gather based on indices, which are needed anyway) + gdf_column* out_col_agg, //aggregation result + gdf_context* ctxt) //struct with additional info: bool is_sorted, flag_sort_or_hash, bool flag_count_distinct +{ + if( ctxt->flag_distinct ) + gdf_group_by_single(ncols, cols, col_agg, out_col_indices, out_col_values, out_col_agg, ctxt, GDF_COUNT_DISTINCT); + else + return gdf_group_by_single(ncols, cols, col_agg, out_col_indices, out_col_values, out_col_agg, ctxt, GDF_COUNT); +} + + diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index 8f429b34190..e9e9a6a9897 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -19,5 +19,6 @@ enable_testing() message(STATUS "******** Configuring tests ********") add_subdirectory(foo-sample) +add_subdirectory(sqls) message(STATUS "******** Tests are ready ********") diff --git a/src/tests/sqls/CMakeLists.txt b/src/tests/sqls/CMakeLists.txt new file mode 100644 index 00000000000..981fe9cc949 --- /dev/null +++ b/src/tests/sqls/CMakeLists.txt @@ -0,0 +1,5 @@ +set(sqls_g_tester_SRCS + sqls_g_tester.cu +) + +configure_test(sqls_g_tester "${sqls_g_tester_SRCS}") diff --git a/src/tests/sqls/sqls_g_tester.cu b/src/tests/sqls/sqls_g_tester.cu new file mode 100644 index 00000000000..a495693a5c5 --- /dev/null +++ b/src/tests/sqls/sqls_g_tester.cu @@ -0,0 +1,841 @@ +/* Copyright 2018 NVIDIA Corporation. All rights reserved. */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +// + +#include +#include + +// + +#include +#include +#include +#include + +#include "gtest/gtest.h" + +#include "sqls_rtti_comp.hpp" + +template +using Vector = thrust::device_vector; + +///using IndexT = int;//okay... +using IndexT = size_t; + +template class Vector> +__host__ __device__ +void print_v(const Vector& v, std::ostream& os) +{ + thrust::copy(v.begin(), v.end(), std::ostream_iterator(os,",")); + os<<"\n"; +} + +template class Vector> +__host__ __device__ +void print_v(const Vector& v, typename Vector::const_iterator pos, std::ostream& os) +{ + thrust::copy(v.begin(), pos, std::ostream_iterator(os,","));//okay + os<<"\n"; +} + +template class Vector> +__host__ __device__ +void print_v(const Vector& v, size_t n, std::ostream& os) +{ + thrust::copy_n(v.begin(), n, std::ostream_iterator(os,","));//okay + os<<"\n"; +} + +template +bool compare(const Vector& d_v, const std::vector& baseline, T eps) +{ + size_t n = baseline.size();//because d_v might be larger + + std::vector h_v(n); + std::vector h_b(n, 0); + + thrust::copy_n(d_v.begin(), n, h_v.begin());//D-H okay... + + return std::inner_product(h_v.begin(), h_v.end(), + baseline.begin(), + true, + [](bool b1, bool b2){ + return b1 && b2; + }, + [eps](T v1, T v2){ + return (std::abs(v1-v2) < eps); + }); +} + +TEST(gdf_group_by_sum, UsageTestSum) +{ + std::vector vc1{1,1,1,1,1,1}; + std::vector vi1{1,3,3,5,5,0}; + std::vector vd1{12., 13., 13., 17., 17., 17}; + + Vector dc1 = vc1; + Vector di1 = vi1; + Vector dd1 = vd1; + + size_t sz = dc1.size(); + assert( sz > 0 ); + assert( sz == di1.size() ); + assert( sz == dd1.size() ); + + + Vector d_indx(sz, 0); + Vector d_keys(sz, 0); + Vector d_vals(sz, 0); + + size_t ncols = 3; + size_t& nrows = sz; + + Vector d_cols(ncols, nullptr); + Vector d_types(ncols, 0); + + std::vector v_gdf_cols(ncols); + v_gdf_cols[0].data = static_cast(dc1.data().get()); + v_gdf_cols[0].size = nrows; + v_gdf_cols[0].dtype = GDF_INT32; + + v_gdf_cols[1].data = static_cast(di1.data().get()); + v_gdf_cols[1].size = nrows; + v_gdf_cols[1].dtype = GDF_INT32; + + v_gdf_cols[2].data = static_cast(dd1.data().get()); + v_gdf_cols[2].size = nrows; + v_gdf_cols[2].dtype = GDF_FLOAT64; + + gdf_column c_agg; + gdf_column c_vout; + + Vector d_outd(sz, 0); + + c_agg.dtype = GDF_FLOAT64; + c_agg.data = dd1.data().get(); + c_agg.size = nrows; + + c_vout.dtype = GDF_FLOAT64; + c_vout.data = d_outd.data().get(); + c_vout.size = nrows; + + size_t n_group = 0; + int flag_sorted = 0; + + std::cout<<"aggregate = sum on column:\n"; + print_v(dd1, std::cout); + + //input + //{ + gdf_context ctxt{0, GDF_SORT, 0}; + std::vector v_pcols(ncols); + for(int i = 0; i < ncols; ++i) + { + v_pcols[i] = &v_gdf_cols[i]; + } + gdf_column** cols = &v_pcols[0];//pointer semantic (2); + //} + + //output: + //{ + Vector d_vc_out(nrows); + Vector d_vi_out(nrows); + Vector d_vd_out(nrows); + + std::vector v_gdf_cols_out(ncols); + v_gdf_cols_out[0].data = d_vc_out.data().get(); + v_gdf_cols_out[0].dtype = GDF_INT32; + v_gdf_cols_out[0].size = nrows; + + v_gdf_cols_out[1].data = d_vi_out.data().get(); + v_gdf_cols_out[1].dtype = GDF_INT32; + v_gdf_cols_out[1].size = nrows; + + v_gdf_cols_out[2].data = d_vd_out.data().get(); + v_gdf_cols_out[2].dtype = GDF_FLOAT64; + v_gdf_cols_out[2].size = nrows; + + std::vector h_cols_out(ncols); + for(int i=0; i vk{5,0,2,4}; + vd1 = {17,12,26,34}; + + flag = compare(d_keys, vk, szeps); + EXPECT_EQ( flag, true ) << "GROUP-BY row indices return unexpected result"; + + flag = compare(d_outd, vd1, deps); + EXPECT_EQ( flag, true ) << "GROUP-BY SUM aggregation returns unexpected result"; +} + +TEST(gdf_group_by_count, UsageTestCount) +{ + std::vector vc1{1,1,1,1,1,1}; + std::vector vi1{1,3,3,5,5,0}; + std::vector vd1{12., 13., 13., 17., 17., 17}; + + Vector dc1 = vc1; + Vector di1 = vi1; + Vector dd1 = vd1; + + size_t sz = dc1.size(); + assert( sz > 0 ); + assert( sz == di1.size() ); + assert( sz == dd1.size() ); + + Vector d_indx(sz, 0); + Vector d_keys(sz, 0); + Vector d_vals(sz, 0); + + size_t ncols = 3; + size_t& nrows = sz; + + Vector d_cols(ncols, nullptr); + Vector d_types(ncols, 0); + + std::vector v_gdf_cols(ncols); + v_gdf_cols[0].data = static_cast(dc1.data().get()); + v_gdf_cols[0].size = nrows; + v_gdf_cols[0].dtype = GDF_INT32; + + v_gdf_cols[1].data = static_cast(di1.data().get()); + v_gdf_cols[1].size = nrows; + v_gdf_cols[1].dtype = GDF_INT32; + + v_gdf_cols[2].data = static_cast(dd1.data().get()); + v_gdf_cols[2].size = nrows; + v_gdf_cols[2].dtype = GDF_FLOAT64; + + gdf_column c_agg; + gdf_column c_vout; + + Vector d_outd(sz, 0); + + c_agg.dtype = GDF_FLOAT64; + c_agg.data = dd1.data().get(); + c_agg.size = nrows; + + c_vout.dtype = GDF_INT32; + c_vout.data = d_vals.data().get(); + c_vout.size = nrows; + + size_t n_group = 0; + int flag_sorted = 0; + + std::cout<<"aggregate = count on column:\n"; + print_v(dd1, std::cout); + + //input + //{ + gdf_context ctxt{0, GDF_SORT, 0}; + std::vector v_pcols(ncols); + for(int i = 0; i < ncols; ++i) + { + v_pcols[i] = &v_gdf_cols[i]; + } + gdf_column** cols = &v_pcols[0];//pointer semantic (2); + //} + + //output: + //{ + Vector d_vc_out(nrows); + Vector d_vi_out(nrows); + Vector d_vd_out(nrows); + + std::vector v_gdf_cols_out(ncols); + v_gdf_cols_out[0].data = d_vc_out.data().get(); + v_gdf_cols_out[0].dtype = GDF_INT32; + v_gdf_cols_out[0].size = nrows; + + v_gdf_cols_out[1].data = d_vi_out.data().get(); + v_gdf_cols_out[1].dtype = GDF_INT32; + v_gdf_cols_out[1].size = nrows; + + v_gdf_cols_out[2].data = d_vd_out.data().get(); + v_gdf_cols_out[2].dtype = GDF_FLOAT64; + v_gdf_cols_out[2].size = nrows; + + std::vector h_cols_out(ncols); + for(int i=0; i vk{5,0,2,4}; + std::vector vals{1,1,2,2}; + + flag = compare(d_keys, vk, szeps); + EXPECT_EQ( flag, true ) << "GROUP-BY row indices return unexpected result"; + + flag = compare(d_vals, vals, szeps); + EXPECT_EQ( flag, true ) << "GROUP-BY COUNT aggregation returns unexpected result"; +} + +TEST(gdf_group_by_avg, UsageTestAvg) +{ + std::vector vc1{1,1,1,1,1,1}; + std::vector vi1{1,3,3,5,5,0}; + std::vector vd1{12., 13., 13., 17., 17., 17}; + + Vector dc1 = vc1; + Vector di1 = vi1; + Vector dd1 = vd1; + + size_t sz = dc1.size(); + assert( sz == di1.size() ); + assert( sz == dd1.size() ); + + Vector d_indx(sz, 0); + Vector d_keys(sz, 0); + Vector d_vals(sz, 0); + + size_t ncols = 3; + size_t& nrows = sz; + + Vector d_cols(ncols, nullptr); + Vector d_types(ncols, 0); + + std::vector v_gdf_cols(ncols); + v_gdf_cols[0].data = static_cast(dc1.data().get()); + v_gdf_cols[0].size = nrows; + v_gdf_cols[0].dtype = GDF_INT32; + + v_gdf_cols[1].data = static_cast(di1.data().get()); + v_gdf_cols[1].size = nrows; + v_gdf_cols[1].dtype = GDF_INT32; + + v_gdf_cols[2].data = static_cast(dd1.data().get()); + v_gdf_cols[2].size = nrows; + v_gdf_cols[2].dtype = GDF_FLOAT64; + + + gdf_column c_agg; + gdf_column c_vout; + + Vector d_outd(sz, 0); + + c_agg.dtype = GDF_FLOAT64; + c_agg.data = dd1.data().get(); + c_agg.size = nrows; + + c_vout.dtype = GDF_FLOAT64; + c_vout.data = d_outd.data().get(); + c_vout.size = nrows; + + size_t n_group = 0; + int flag_sorted = 0; + + std::cout<<"aggregate = avg on column:\n"; + print_v(dd1, std::cout); + + //input + //{ + gdf_context ctxt{0, GDF_SORT, 0}; + std::vector v_pcols(ncols); + for(int i = 0; i < ncols; ++i) + { + v_pcols[i] = &v_gdf_cols[i]; + } + gdf_column** cols = &v_pcols[0];//pointer semantic (2); + //} + + //output: + //{ + Vector d_vc_out(nrows); + Vector d_vi_out(nrows); + Vector d_vd_out(nrows); + + std::vector v_gdf_cols_out(ncols); + v_gdf_cols_out[0].data = d_vc_out.data().get(); + v_gdf_cols_out[0].dtype = GDF_INT32; + v_gdf_cols_out[0].size = nrows; + + v_gdf_cols_out[1].data = d_vi_out.data().get(); + v_gdf_cols_out[1].dtype = GDF_INT32; + v_gdf_cols_out[1].size = nrows; + + v_gdf_cols_out[2].data = d_vd_out.data().get(); + v_gdf_cols_out[2].dtype = GDF_FLOAT64; + v_gdf_cols_out[2].size = nrows; + + std::vector h_cols_out(ncols); + for(int i=0; i vk{5,0,2,4}; + vd1 = {17,12,13,17}; + + flag = compare(d_keys, vk, szeps); + EXPECT_EQ( flag, true ) << "GROUP-BY row indices return unexpected result"; + + flag = compare(d_outd, vd1, deps); + EXPECT_EQ( flag, true ) << "GROUP-BY AVG aggregation returns unexpected result"; +} + +TEST(gdf_group_by_min, UsageTestMin) +{ + std::vector vc1{1,1,1,1,1,1}; + std::vector vi1{1,3,3,5,5,0}; + std::vector vd1{12., 13., 13., 17., 17., 17}; + + Vector dc1 = vc1; + Vector di1 = vi1; + Vector dd1 = vd1; + + size_t sz = dc1.size(); + assert( sz == di1.size() ); + assert( sz == dd1.size() ); + + Vector d_indx(sz, 0); + Vector d_keys(sz, 0); + Vector d_vals(sz, 0); + + size_t ncols = 3; + size_t& nrows = sz; + + Vector d_cols(ncols, nullptr); + Vector d_types(ncols, 0); + + std::vector v_gdf_cols(ncols); + v_gdf_cols[0].data = static_cast(dc1.data().get()); + v_gdf_cols[0].size = nrows; + v_gdf_cols[0].dtype = GDF_INT32; + + v_gdf_cols[1].data = static_cast(di1.data().get()); + v_gdf_cols[1].size = nrows; + v_gdf_cols[1].dtype = GDF_INT32; + + v_gdf_cols[2].data = static_cast(dd1.data().get()); + v_gdf_cols[2].size = nrows; + v_gdf_cols[2].dtype = GDF_FLOAT64; + + gdf_column c_agg; + gdf_column c_vout; + + Vector d_outd(sz, 0); + + c_agg.dtype = GDF_FLOAT64; + ///c_agg.data = dd1.data().get(); + c_agg.size = nrows; + + c_vout.dtype = GDF_FLOAT64; + c_vout.data = d_outd.data().get(); + c_vout.size = nrows; + + size_t n_group = 0; + int flag_sorted = 0; + + std::vector v_col{2., 4., 5., 7., 11., 3.}; + thrust::device_vector d_col = v_col; + + std::cout<<"aggregate = min on column:\n"; + print_v(d_col, std::cout); + + c_agg.dtype = GDF_FLOAT64; + c_agg.data = d_col.data().get(); + + //input + //{ + gdf_context ctxt{0, GDF_SORT, 0}; + std::vector v_pcols(ncols); + for(int i = 0; i < ncols; ++i) + { + v_pcols[i] = &v_gdf_cols[i]; + } + gdf_column** cols = &v_pcols[0];//pointer semantic (2); + //} + + //output: + //{ + Vector d_vc_out(nrows); + Vector d_vi_out(nrows); + Vector d_vd_out(nrows); + + std::vector v_gdf_cols_out(ncols); + v_gdf_cols_out[0].data = d_vc_out.data().get(); + v_gdf_cols_out[0].dtype = GDF_INT32; + v_gdf_cols_out[0].size = nrows; + + v_gdf_cols_out[1].data = d_vi_out.data().get(); + v_gdf_cols_out[1].dtype = GDF_INT32; + v_gdf_cols_out[1].size = nrows; + + v_gdf_cols_out[2].data = d_vd_out.data().get(); + v_gdf_cols_out[2].dtype = GDF_FLOAT64; + v_gdf_cols_out[2].size = nrows; + + std::vector h_cols_out(ncols); + for(int i=0; i vk{5,0,2,4}; + vd1 = {3,2,4,7}; + + flag = compare(d_keys, vk, szeps); + EXPECT_EQ( flag, true ) << "GROUP-BY row indices return unexpected result"; + + flag = compare(d_outd, vd1, deps); + EXPECT_EQ( flag, true ) << "GROUP-BY MIN aggregation returns unexpected result"; +} + +TEST(gdf_group_by_max, UsageTestMax) +{ + std::vector vc1{1,1,1,1,1,1}; + std::vector vi1{1,3,3,5,5,0}; + std::vector vd1{12., 13., 13., 17., 17., 17}; + + Vector dc1 = vc1; + Vector di1 = vi1; + Vector dd1 = vd1; + + size_t sz = dc1.size(); + assert( sz == di1.size() ); + assert( sz == dd1.size() ); + + Vector d_indx(sz, 0); + Vector d_keys(sz, 0); + Vector d_vals(sz, 0); + + size_t ncols = 3; + size_t& nrows = sz; + + Vector d_cols(ncols, nullptr); + Vector d_types(ncols, 0); + + std::vector v_gdf_cols(ncols); + v_gdf_cols[0].data = static_cast(dc1.data().get()); + v_gdf_cols[0].size = nrows; + v_gdf_cols[0].dtype = GDF_INT32; + + v_gdf_cols[1].data = static_cast(di1.data().get()); + v_gdf_cols[1].size = nrows; + v_gdf_cols[1].dtype = GDF_INT32; + + v_gdf_cols[2].data = static_cast(dd1.data().get()); + v_gdf_cols[2].size = nrows; + v_gdf_cols[2].dtype = GDF_FLOAT64; + + gdf_column c_agg; + gdf_column c_vout; + + Vector d_outd(sz, 0); + + c_agg.dtype = GDF_FLOAT64; + ///c_agg.data = dd1.data().get(); + c_agg.size = nrows; + + c_vout.dtype = GDF_FLOAT64; + c_vout.data = d_outd.data().get(); + c_vout.size = nrows; + + size_t n_group = 0; + int flag_sorted = 0; + + std::vector v_col{2., 4., 5., 7., 11., 3.}; + thrust::device_vector d_col = v_col; + + std::cout<<"aggregate = max on column:\n"; + print_v(d_col, std::cout); + + c_agg.dtype = GDF_FLOAT64; + c_agg.data = d_col.data().get(); + + //input + //{ + gdf_context ctxt{0, GDF_SORT, 0}; + std::vector v_pcols(ncols); + for(int i = 0; i < ncols; ++i) + { + v_pcols[i] = &v_gdf_cols[i]; + } + gdf_column** cols = &v_pcols[0];//pointer semantic (2); + //} + + //output: + //{ + Vector d_vc_out(nrows); + Vector d_vi_out(nrows); + Vector d_vd_out(nrows); + + std::vector v_gdf_cols_out(ncols); + v_gdf_cols_out[0].data = d_vc_out.data().get(); + v_gdf_cols_out[0].dtype = GDF_INT32; + v_gdf_cols_out[0].size = nrows; + + v_gdf_cols_out[1].data = d_vi_out.data().get(); + v_gdf_cols_out[1].dtype = GDF_INT32; + v_gdf_cols_out[1].size = nrows; + + v_gdf_cols_out[2].data = d_vd_out.data().get(); + v_gdf_cols_out[2].dtype = GDF_FLOAT64; + v_gdf_cols_out[2].size = nrows; + + std::vector h_cols_out(ncols); + for(int i=0; i vk{5,0,2,4}; + vd1 = {3,2,5,11}; + + flag = compare(d_keys, vk, szeps); + EXPECT_EQ( flag, true ) << "GROUP-BY row indices return unexpected result"; + + flag = compare(d_outd, vd1, deps); + EXPECT_EQ( flag, true ) << "GROUP-BY MAX aggregation returns unexpected result"; +} + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + + diff --git a/src/tests/sqls/sqls_g_tester.cu~ b/src/tests/sqls/sqls_g_tester.cu~ new file mode 100644 index 00000000000..7ed3f64dc43 --- /dev/null +++ b/src/tests/sqls/sqls_g_tester.cu~ @@ -0,0 +1,238 @@ +/* Copyright 2018 NVIDIA Corporation. All rights reserved. */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +// + +#include +#include + +// + +#include +#include +#include +#include + +#include "gtest/gtest.h" + +#include "sqls_rtti_comp.hpp" + +template +using Vector = thrust::device_vector; + +///using IndexT = int;//okay... +using IndexT = size_t; + +template class Vector> +__host__ __device__ +void print_v(const Vector& v, std::ostream& os) +{ + thrust::copy(v.begin(), v.end(), std::ostream_iterator(os,",")); + os<<"\n"; +} + +template class Vector> +__host__ __device__ +void print_v(const Vector& v, typename Vector::const_iterator pos, std::ostream& os) +{ + thrust::copy(v.begin(), pos, std::ostream_iterator(os,","));//okay + os<<"\n"; +} + +template class Vector> +__host__ __device__ +void print_v(const Vector& v, size_t n, std::ostream& os) +{ + thrust::copy_n(v.begin(), n, std::ostream_iterator(os,","));//okay + os<<"\n"; +} + +template +bool compare(const Vector& d_v, const std::vector& baseline, T eps) +{ + size_t n = baseline.size();//because d_v might be larger + + std::vector h_v(n); + std::vector h_b(n, 0); + + thrust::copy_n(d_v.begin(), n, h_v.begin());//D-H okay... + + return std::inner_product(h_v.begin(), h_v.end(), + baseline.begin(), + true, + [](bool b1, bool b2){ + return b1 && b2; + }, + [eps](T v1, T v2){ + return (std::abs(v1-v2) < eps); + }); +} + +TEST(gdf_group_by_sum, UsageTestSum) +{ + std::vector vc1{1,1,1,1,1,1}; + std::vector vi1{1,3,3,5,5,0}; + std::vector vd1{12., 13., 13., 17., 17., 17}; + + Vector dc1 = vc1; + Vector di1 = vi1; + Vector dd1 = vd1; + + size_t sz = dc1.size(); + assert( sz == di1.size() ); + assert( sz == dd1.size() ); + + Vector d_indx(sz, 0); + Vector d_keys(sz, 0); + Vector d_vals(sz, 0); + + size_t ncols = 3; + size_t& nrows = sz; + + Vector d_cols(ncols, nullptr); + Vector d_types(ncols, 0); + + std::vector v_gdf_cols(ncols); + v_gdf_cols[0].data = static_cast(dc1.data().get()); + v_gdf_cols[0].size = nrows; + v_gdf_cols[0].dtype = GDF_INT32; + + v_gdf_cols[1].data = static_cast(di1.data().get()); + v_gdf_cols[1].size = nrows; + v_gdf_cols[1].dtype = GDF_INT32; + + v_gdf_cols[2].data = static_cast(dd1.data().get()); + v_gdf_cols[2].size = nrows; + v_gdf_cols[2].dtype = GDF_FLOAT64; + + + gdf_column c_agg; + gdf_column c_vout; + + Vector d_outd(sz, 0); + + c_agg.dtype = GDF_FLOAT64; + c_agg.data = dd1.data().get(); + c_agg.size = nrows; + + c_vout.dtype = GDF_FLOAT64; + c_vout.data = d_outd.data().get(); + c_vout.size = nrows; + + size_t n_group = 0; + int flag_sorted = 0; + + std::cout<<"aggregate = sum on column:\n"; + print_v(dd1, std::cout); + + //input + //{ + gdf_context ctxt{0, GDF_SORT, 0}; + std::vector v_pcols(ncols); + for(int i = 0; i < ncols; ++i) + { + v_pcols[i] = &v_gdf_cols[i]; + } + gdf_column** cols = &v_pcols[0];//pointer semantic (2); + //} + + //output: + //{ + Vector d_vc_out(nrows); + Vector d_vi_out(nrows); + Vector d_vd_out(nrows); + + std::vector v_gdf_cols_out(ncols); + v_gdf_cols_out[0].data = d_vc_out.data().get(); + v_gdf_cols_out[0].dtype = GDF_INT32; + v_gdf_cols_out[0].size = nrows; + + v_gdf_cols_out[1].data = d_vi_out.data().get(); + v_gdf_cols_out[1].dtype = GDF_INT32; + v_gdf_cols_out[1].size = nrows; + + v_gdf_cols_out[2].data = d_vd_out.data().get(); + v_gdf_cols_out[2].dtype = GDF_FLOAT64; + v_gdf_cols_out[2].size = nrows; + + std::vector h_cols_out(ncols); + for(int i=0; i.cu -o .exe +// +// nvcc -I/$HOME/Development/Cuda_Thrust -c -w -std=c++11 --expt-extended-lambda sqls_join4.cu +// nvcc -I/$HOME/Development/Cuda_Thrust -c -w -std=c++11 --expt-extended-lambda sqls_ops.cu +// nvcc -I/$HOME/Development/Cuda_Thrust -w -std=c++11 --expt-extended-lambda sqls_join4.cu sqls_ops.cu -o sqls_join.exe +// +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + + +#include "sqls_rtti_comp.hpp" + +extern gdf_error gdf_filter(size_t nrows, + gdf_column* cols, + size_t ncols, + void** d_cols,//device-side data slicing of gdf_column array (host) + int* d_types, //device-side dtype slicing of gdf_column array (host) + void** d_vals, + size_t* d_indx, + size_t* new_sz); + +template +using Vector = thrust::device_vector; + +///using IndexT = int;//okay... +using IndexT = size_t; + +template class Vector> +__host__ __device__ +void print_v(const Vector& v, std::ostream& os) +{ + thrust::copy(v.begin(), v.end(), std::ostream_iterator(os,",")); + os<<"\n"; +} + +template class Vector> +__host__ __device__ +void print_v(const Vector& v, typename Vector::const_iterator pos, std::ostream& os) +{ + thrust::copy(v.begin(), pos, std::ostream_iterator(os,","));//okay + os<<"\n"; +} + +template class Vector> +__host__ __device__ +void print_v(const Vector& v, size_t n, std::ostream& os) +{ + thrust::copy_n(v.begin(), n, std::ostream_iterator(os,","));//okay + os<<"\n"; +} + +void f_test_multi_filter(void) +{ + std::vector vc1{1,1,1,1,1,1}; + std::vector vi1{1,3,3,5,5,5}; + std::vector vd1{12., 13., 13., 17., 17., 17}; + + Vector dc1 = vc1; + Vector di1 = vi1; + Vector dd1 = vd1; + + size_t nrows = dc1.size(); + assert( nrows == di1.size() ); + assert( nrows == dd1.size() ); + + ///thrust::tuple tptrs{dc1.data().get(), di1.data().get(), dd1.data().get()}; + + int i = 1; + int j = 3; + double d = 13.; + + vc1.resize(1); + vi1.resize(1); + vd1.resize(1); + + vc1[0] = i; + vi1[0] = j; + vd1[0] = d; + + Vector d_ci = vc1; + Vector d_cj = vi1; + Vector d_cd = vd1; + + ///thrust::tuple tvals{i,j,d}; + + size_t new_sz = 0; + Vector d_indices(nrows, 0); + + // thrust::tuple + // tpairs{dc1.data().get(), d_ci.data().get(), + // di1.data().get(), d_cj.data().get(), + // dd1.data().get(), d_cd.data().get()}; + + ///new_sz = multi_col_filter(nrows, tpairs, d_indices.data().get());//ok + + size_t ncols = 3; + Vector d_cols(ncols, nullptr); + Vector d_types(ncols, 0); + Vector d_indx(nrows, 0); + + std::vector v_gdf_cols(ncols); + v_gdf_cols[0].data = static_cast(dc1.data().get()); + v_gdf_cols[0].size = sizeof(int); + v_gdf_cols[0].dtype = GDF_INT32; + + v_gdf_cols[1].data = static_cast(di1.data().get()); + v_gdf_cols[1].size = sizeof(int); + v_gdf_cols[1].dtype = GDF_INT32; + + v_gdf_cols[2].data = static_cast(dd1.data().get()); + v_gdf_cols[2].size = sizeof(double); + v_gdf_cols[2].dtype = GDF_FLOAT64; + + std::vector v_vals{static_cast(d_ci.data().get()), + static_cast(d_cj.data().get()), + static_cast(d_cd.data().get())}; + + Vector d_vals = v_vals; + + gdf_column* h_columns = &v_gdf_cols[0]; + void** d_col_data = d_cols.data().get(); + int* d_col_types = d_types.data().get(); + size_t* ptr_d_indx = d_indices.data().get(); + void** ptr_d_vals = d_vals.data().get(); + + gdf_filter(nrows, h_columns, ncols, d_col_data, d_col_types, ptr_d_vals, ptr_d_indx, &new_sz); + + bool res = (new_sz > 0); + + if( res ) + { + std::cout<<"filtered size: "<& vc1, + const std::vector& vi1, + const std::vector& vd1) +{ + Vector dc1 = vc1; + Vector di1 = vi1; + Vector dd1 = vd1; + + size_t sz = dc1.size(); + assert( sz == di1.size() ); + assert( sz == dd1.size() ); + + Vector d_indx(sz, 0); + Vector d_keys(sz, 0); + Vector d_vals(sz, 0); + + size_t ncols = 3; + size_t& nrows = sz; + + Vector d_cols(ncols, nullptr); + Vector d_types(ncols, 0); + + std::vector v_gdf_cols(ncols); + v_gdf_cols[0].data = static_cast(dc1.data().get()); + v_gdf_cols[0].size = nrows; + v_gdf_cols[0].dtype = GDF_INT32; + + v_gdf_cols[1].data = static_cast(di1.data().get()); + v_gdf_cols[1].size = nrows; + v_gdf_cols[1].dtype = GDF_INT32; + + v_gdf_cols[2].data = static_cast(dd1.data().get()); + v_gdf_cols[2].size = nrows; + v_gdf_cols[2].dtype = GDF_FLOAT64; + + + gdf_column c_agg; + gdf_column c_vout; + + Vector d_outd(sz, 0); + + c_agg.dtype = GDF_FLOAT64; + c_agg.data = dd1.data().get(); + c_agg.size = nrows; + + c_vout.dtype = GDF_FLOAT64; + c_vout.data = d_outd.data().get(); + c_vout.size = nrows; + + size_t n_group = 0; + int flag_sorted = 0; + + std::cout<<"aggregate = sum on column:\n"; + print_v(dd1, std::cout); + + //input + //{ + gdf_context ctxt{0, GDF_SORT, 0}; + std::vector v_pcols(ncols); + for(int i = 0; i < ncols; ++i) + { + v_pcols[i] = &v_gdf_cols[i]; + } + gdf_column** cols = &v_pcols[0];//pointer semantic (2); + //} + + //output: + //{ + Vector d_vc_out(nrows); + Vector d_vi_out(nrows); + Vector d_vd_out(nrows); + + std::vector v_gdf_cols_out(ncols); + v_gdf_cols_out[0].data = d_vc_out.data().get(); + v_gdf_cols_out[0].dtype = GDF_INT32; + v_gdf_cols_out[0].size = nrows; + + v_gdf_cols_out[1].data = d_vi_out.data().get(); + v_gdf_cols_out[1].dtype = GDF_INT32; + v_gdf_cols_out[1].size = nrows; + + v_gdf_cols_out[2].data = d_vd_out.data().get(); + v_gdf_cols_out[2].dtype = GDF_FLOAT64; + v_gdf_cols_out[2].size = nrows; + + std::vector h_cols_out(ncols); + for(int i=0; i& vc1, + const std::vector& vi1, + const std::vector& vd1) +{ + Vector dc1 = vc1; + Vector di1 = vi1; + Vector dd1 = vd1; + + size_t sz = dc1.size(); + assert( sz == di1.size() ); + assert( sz == dd1.size() ); + + Vector d_indx(sz, 0); + Vector d_keys(sz, 0); + Vector d_vals(sz, 0); + + size_t ncols = 3; + size_t& nrows = sz; + + Vector d_cols(ncols, nullptr); + Vector d_types(ncols, 0); + + std::vector v_gdf_cols(ncols); + v_gdf_cols[0].data = static_cast(dc1.data().get()); + v_gdf_cols[0].size = nrows; + v_gdf_cols[0].dtype = GDF_INT32; + + v_gdf_cols[1].data = static_cast(di1.data().get()); + v_gdf_cols[1].size = nrows; + v_gdf_cols[1].dtype = GDF_INT32; + + v_gdf_cols[2].data = static_cast(dd1.data().get()); + v_gdf_cols[2].size = nrows; + v_gdf_cols[2].dtype = GDF_FLOAT64; + + + gdf_column c_agg; + gdf_column c_vout; + + Vector d_outd(sz, 0); + + c_agg.dtype = GDF_FLOAT64; + c_agg.data = dd1.data().get(); + c_agg.size = nrows; + + c_vout.dtype = GDF_INT32; + c_vout.data = d_vals.data().get(); + c_vout.size = nrows; + + size_t n_group = 0; + int flag_sorted = 0; + + std::cout<<"aggregate = count on column:\n"; + print_v(dd1, std::cout); + + //input + //{ + gdf_context ctxt{0, GDF_SORT, 0}; + std::vector v_pcols(ncols); + for(int i = 0; i < ncols; ++i) + { + v_pcols[i] = &v_gdf_cols[i]; + } + gdf_column** cols = &v_pcols[0];//pointer semantic (2); + //} + + //output: + //{ + Vector d_vc_out(nrows); + Vector d_vi_out(nrows); + Vector d_vd_out(nrows); + + std::vector v_gdf_cols_out(ncols); + v_gdf_cols_out[0].data = d_vc_out.data().get(); + v_gdf_cols_out[0].dtype = GDF_INT32; + v_gdf_cols_out[0].size = nrows; + + v_gdf_cols_out[1].data = d_vi_out.data().get(); + v_gdf_cols_out[1].dtype = GDF_INT32; + v_gdf_cols_out[1].size = nrows; + + v_gdf_cols_out[2].data = d_vd_out.data().get(); + v_gdf_cols_out[2].dtype = GDF_FLOAT64; + v_gdf_cols_out[2].size = nrows; + + std::vector h_cols_out(ncols); + for(int i=0; i& vc1, + const std::vector& vi1, + const std::vector& vd1) +{ + Vector dc1 = vc1; + Vector di1 = vi1; + Vector dd1 = vd1; + + size_t sz = dc1.size(); + assert( sz == di1.size() ); + assert( sz == dd1.size() ); + + Vector d_indx(sz, 0); + Vector d_keys(sz, 0); + Vector d_vals(sz, 0); + + size_t ncols = 3; + size_t& nrows = sz; + + Vector d_cols(ncols, nullptr); + Vector d_types(ncols, 0); + + std::vector v_gdf_cols(ncols); + v_gdf_cols[0].data = static_cast(dc1.data().get()); + v_gdf_cols[0].size = nrows; + v_gdf_cols[0].dtype = GDF_INT32; + + v_gdf_cols[1].data = static_cast(di1.data().get()); + v_gdf_cols[1].size = nrows; + v_gdf_cols[1].dtype = GDF_INT32; + + v_gdf_cols[2].data = static_cast(dd1.data().get()); + v_gdf_cols[2].size = nrows; + v_gdf_cols[2].dtype = GDF_FLOAT64; + + gdf_column c_agg; + gdf_column c_vout; + + Vector d_outd(sz, 0); + + c_agg.dtype = GDF_FLOAT64; + ///c_agg.data = dd1.data().get(); + c_agg.size = nrows; + + c_vout.dtype = GDF_FLOAT64; + c_vout.data = d_outd.data().get(); + c_vout.size = nrows; + + size_t n_group = 0; + int flag_sorted = 0; + + std::vector v_col{2., 4., 5., 7., 11., 3.}; + thrust::device_vector d_col = v_col; + + std::cout<<"aggregate = min on column:\n"; + print_v(d_col, std::cout); + + c_agg.dtype = GDF_FLOAT64; + c_agg.data = d_col.data().get(); + + //input + //{ + gdf_context ctxt{0, GDF_SORT, 0}; + std::vector v_pcols(ncols); + for(int i = 0; i < ncols; ++i) + { + v_pcols[i] = &v_gdf_cols[i]; + } + gdf_column** cols = &v_pcols[0];//pointer semantic (2); + //} + + //output: + //{ + Vector d_vc_out(nrows); + Vector d_vi_out(nrows); + Vector d_vd_out(nrows); + + std::vector v_gdf_cols_out(ncols); + v_gdf_cols_out[0].data = d_vc_out.data().get(); + v_gdf_cols_out[0].dtype = GDF_INT32; + v_gdf_cols_out[0].size = nrows; + + v_gdf_cols_out[1].data = d_vi_out.data().get(); + v_gdf_cols_out[1].dtype = GDF_INT32; + v_gdf_cols_out[1].size = nrows; + + v_gdf_cols_out[2].data = d_vd_out.data().get(); + v_gdf_cols_out[2].dtype = GDF_FLOAT64; + v_gdf_cols_out[2].size = nrows; + + std::vector h_cols_out(ncols); + for(int i=0; i& vc1, + const std::vector& vi1, + const std::vector& vd1) +{ + Vector dc1 = vc1; + Vector di1 = vi1; + Vector dd1 = vd1; + + size_t sz = dc1.size(); + assert( sz == di1.size() ); + assert( sz == dd1.size() ); + + Vector d_indx(sz, 0); + Vector d_keys(sz, 0); + Vector d_vals(sz, 0); + + size_t ncols = 3; + size_t& nrows = sz; + + Vector d_cols(ncols, nullptr); + Vector d_types(ncols, 0); + + std::vector v_gdf_cols(ncols); + v_gdf_cols[0].data = static_cast(dc1.data().get()); + v_gdf_cols[0].size = nrows; + v_gdf_cols[0].dtype = GDF_INT32; + + v_gdf_cols[1].data = static_cast(di1.data().get()); + v_gdf_cols[1].size = nrows; + v_gdf_cols[1].dtype = GDF_INT32; + + v_gdf_cols[2].data = static_cast(dd1.data().get()); + v_gdf_cols[2].size = nrows; + v_gdf_cols[2].dtype = GDF_FLOAT64; + + gdf_column c_agg; + gdf_column c_vout; + + Vector d_outd(sz, 0); + + c_agg.dtype = GDF_FLOAT64; + ///c_agg.data = dd1.data().get(); + c_agg.size = nrows; + + c_vout.dtype = GDF_FLOAT64; + c_vout.data = d_outd.data().get(); + c_vout.size = nrows; + + size_t n_group = 0; + int flag_sorted = 0; + + std::vector v_col{2., 4., 5., 7., 11., 3.}; + thrust::device_vector d_col = v_col; + + std::cout<<"aggregate = max on column:\n"; + print_v(d_col, std::cout); + + c_agg.dtype = GDF_FLOAT64; + c_agg.data = d_col.data().get(); + + //input + //{ + gdf_context ctxt{0, GDF_SORT, 0}; + std::vector v_pcols(ncols); + for(int i = 0; i < ncols; ++i) + { + v_pcols[i] = &v_gdf_cols[i]; + } + gdf_column** cols = &v_pcols[0];//pointer semantic (2); + //} + + //output: + //{ + Vector d_vc_out(nrows); + Vector d_vi_out(nrows); + Vector d_vd_out(nrows); + + std::vector v_gdf_cols_out(ncols); + v_gdf_cols_out[0].data = d_vc_out.data().get(); + v_gdf_cols_out[0].dtype = GDF_INT32; + v_gdf_cols_out[0].size = nrows; + + v_gdf_cols_out[1].data = d_vi_out.data().get(); + v_gdf_cols_out[1].dtype = GDF_INT32; + v_gdf_cols_out[1].size = nrows; + + v_gdf_cols_out[2].data = d_vd_out.data().get(); + v_gdf_cols_out[2].dtype = GDF_FLOAT64; + v_gdf_cols_out[2].size = nrows; + + std::vector h_cols_out(ncols); + for(int i=0; i& vc1, + const std::vector& vi1, + const std::vector& vd1) +{ + Vector dc1 = vc1; + Vector di1 = vi1; + Vector dd1 = vd1; + + size_t sz = dc1.size(); + assert( sz == di1.size() ); + assert( sz == dd1.size() ); + + Vector d_indx(sz, 0); + Vector d_keys(sz, 0); + Vector d_vals(sz, 0); + + size_t ncols = 3; + size_t& nrows = sz; + + Vector d_cols(ncols, nullptr); + Vector d_types(ncols, 0); + + std::vector v_gdf_cols(ncols); + v_gdf_cols[0].data = static_cast(dc1.data().get()); + v_gdf_cols[0].size = nrows; + v_gdf_cols[0].dtype = GDF_INT32; + + v_gdf_cols[1].data = static_cast(di1.data().get()); + v_gdf_cols[1].size = nrows; + v_gdf_cols[1].dtype = GDF_INT32; + + v_gdf_cols[2].data = static_cast(dd1.data().get()); + v_gdf_cols[2].size = nrows; + v_gdf_cols[2].dtype = GDF_FLOAT64; + + + gdf_column c_agg; + gdf_column c_vout; + + Vector d_outd(sz, 0); + + c_agg.dtype = GDF_FLOAT64; + c_agg.data = dd1.data().get(); + c_agg.size = nrows; + + c_vout.dtype = GDF_FLOAT64; + c_vout.data = d_outd.data().get(); + c_vout.size = nrows; + + size_t n_group = 0; + int flag_sorted = 0; + + std::cout<<"aggregate = avg on column:\n"; + print_v(dd1, std::cout); + + //input + //{ + gdf_context ctxt{0, GDF_SORT, 0}; + std::vector v_pcols(ncols); + for(int i = 0; i < ncols; ++i) + { + v_pcols[i] = &v_gdf_cols[i]; + } + gdf_column** cols = &v_pcols[0];//pointer semantic (2); + //} + + //output: + //{ + Vector d_vc_out(nrows); + Vector d_vi_out(nrows); + Vector d_vd_out(nrows); + + std::vector v_gdf_cols_out(ncols); + v_gdf_cols_out[0].data = d_vc_out.data().get(); + v_gdf_cols_out[0].dtype = GDF_INT32; + v_gdf_cols_out[0].size = nrows; + + v_gdf_cols_out[1].data = d_vi_out.data().get(); + v_gdf_cols_out[1].dtype = GDF_INT32; + v_gdf_cols_out[1].size = nrows; + + v_gdf_cols_out[2].data = d_vd_out.data().get(); + v_gdf_cols_out[2].dtype = GDF_FLOAT64; + v_gdf_cols_out[2].size = nrows; + + std::vector h_cols_out(ncols); + for(int i=0; i vc1{1,1,1}; + std::vector vi1{1,1,0}; + std::vector vd1{12., 11., 17.}; + + Vector dc1 = vc1; + Vector di1 = vi1; + Vector dd1 = vd1; + + size_t nrows = dc1.size(); + assert( nrows == di1.size() ); + assert( nrows == dd1.size() ); + + Vector dv(nrows, 0); + ///multi_col_order_by(nrows, tv1, dv);//okay + + size_t ncols = 3; + + std::vector v_cols{static_cast(dc1.data().get()), + static_cast(di1.data().get()), + static_cast(dd1.data().get())}; + std::vector v_types{static_cast(GDF_INT32), + static_cast(GDF_INT32), + static_cast(GDF_FLOAT64)}; + + + Vector d_cols(ncols, nullptr); + Vector d_types(ncols, 0); + Vector d_indx(nrows, 0); + + std::vector v_gdf_cols(ncols); + v_gdf_cols[0].data = static_cast(dc1.data().get()); + v_gdf_cols[0].size = nrows; + v_gdf_cols[0].dtype = GDF_INT32; + + v_gdf_cols[1].data = static_cast(di1.data().get()); + v_gdf_cols[1].size = nrows; + v_gdf_cols[1].dtype = GDF_INT32; + + v_gdf_cols[2].data = static_cast(dd1.data().get()); + v_gdf_cols[2].size = nrows; + v_gdf_cols[2].dtype = GDF_FLOAT64; + + gdf_column* h_columns = &v_gdf_cols[0]; + void** d_col_data = d_cols.data().get(); + int* d_col_types = d_types.data().get(); + size_t* ptr_dv = d_indx.data().get(); + + gdf_order_by(nrows, h_columns, ncols, d_col_data, d_col_types, ptr_dv); + + + std::cout<<"multisort order:\n"; + print_v(d_indx, std::cout); + + //should return: + //multisort order: + //2,1,0, + } + { + //okay: + // + std::vector vc1{1,1,1,1,1,1}; + std::vector vi1{1,3,3,5,5,0}; + std::vector vd1{12., 13., 13., 17., 17., 17}; + + std::cout<<"multi-column group-by experiments:\n"; + + test_gb_count_api_2(vc1, vi1, vd1);//okay + + test_gb_sum_api_2(vc1, vi1, vd1);//okay + + test_gb_avg_api_2(vc1, vi1, vd1);//okay + + test_gb_min_api_2(vc1, vi1, vd1);//okay + + test_gb_max_api_2(vc1, vi1, vd1);//okay + //} + } + + { + std::cout<<"Filtering experiments:\n"; + f_test_multi_filter(); + } + + std::cout << "Done!" << std::endl; + return 0; +}