Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Fix ORC related failed UT #3437

Closed
wants to merge 182 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
182 commits
Select commit Hold shift + click to select a range
8cc1e2c
[GLUTEN-3417][CH] Reduce memory consumption in Expand operator #3418
lgbo-ustc Oct 18, 2023
7fbd3fc
[VL] Remove ColumnarToRow for Gluten columnar table cache (#3430)
PHILO-HE Oct 18, 2023
1ab906a
[VL] Copy compress small partition buffer (#3420)
marin-ma Oct 18, 2023
353c45a
[CORE] fix CoalesceExec (#3372)
zhouyuan Oct 18, 2023
4dbb181
Revert "[VL] Copy compress small partition buffer (#3420)" (#3433)
marin-ma Oct 18, 2023
6cbd786
[GLUTEN-2961][VL] Integrate with upstream Velox (#3341)
rui-mo Oct 19, 2023
048d981
[VL] Copy-compress small partition buffer (#3434)
marin-ma Oct 19, 2023
a713d5e
[GLUTEN-3425] Create not existing HDFS folder when writing HDFS file …
JkSelf Oct 19, 2023
6090cea
[CORE] Code refactor: simplify transformer classes (#3426)
ulysses-you Oct 19, 2023
86abf0a
[VL] Increase kAbandonPartialAggregationMinRows (#3439)
rui-mo Oct 19, 2023
a9324e6
[VL][DOC] Refine the document about parquet write (#3441)
JkSelf Oct 19, 2023
60e92e4
[GLUTEN-2961][VL][FOLLOWUP] Fix issue on macOS (#3455)
ulysses-you Oct 20, 2023
3e73f32
[GLUTEN-3361] Support spark 3.4 in Gluten (#3360)
JkSelf Oct 20, 2023
84599e1
[GLUTEN-3431][CH] Fix convertion from Inf to int
lgbo-ustc Oct 20, 2023
30d78a1
[GLUTEN-3480][CH] Fix incorrect metrics values for ch backend (#3481)
zzcclp Oct 22, 2023
f1a0a3e
[GLUTEN-3467] Fix 'Names of tuple elements must be unique' error for …
zzcclp Oct 23, 2023
83d6cbf
[GLUTEN-1632][CH]Daily Update Clickhouse Version (20231023) (#3482)
lwz9103 Oct 23, 2023
327b8eb
[VL] Decouple JNI object storing code from ExecutionCtx (#3457)
zhztheplayer Oct 23, 2023
71b3793
[VL] Fix duplicate metrics updating caused by R2C (#2865)
rui-mo Oct 23, 2023
04d8ac2
[GLUTEN-3446][CH] Fix unprecise issue when parse float from string (#…
taiyang-li Oct 23, 2023
400a161
[VL] Allow to enable or disable huge page allocation feature in Velox…
marin-ma Oct 23, 2023
c9014ae
GLUTEN-3361] Fix the compile issue with wrong Empty2Null package path…
JkSelf Oct 23, 2023
ecb5cef
[GLUTEN-3489] Fix exception when in-filter contains null value (#3491)
lwz9103 Oct 23, 2023
19335ba
[GLUTEN-3459] Skip unnecessary local sort for static partition write …
lwz9103 Oct 23, 2023
5cec4f9
Fix a typo in error message of native backend initialization (#3495)
j7nhai Oct 23, 2023
157f42c
[VL] Fix a compile issue after rebasing velox (#3484)
PHILO-HE Oct 23, 2023
edc2c53
fix round issue: https://github.com/oap-project/gluten/issues/3462 (#…
taiyang-li Oct 24, 2023
fd44284
[VL] Remove unnecessary registerFileSystem and registerReader
Yohahaha Oct 24, 2023
d8f310a
[VL] Add config for glog verbose level and severity level (#3454)
Yohahaha Oct 24, 2023
cebe098
[VL] Fix Arrow ABI include (#3493)
marin-ma Oct 24, 2023
2aaf07b
[VL] Register row_constructor_with_null (#3499)
rui-mo Oct 24, 2023
7d5e8fb
[GLUTEN-3450][CH] Support allowPrecisionLoss=false (#3463)
loneylee Oct 24, 2023
3637aee
[VL] Add stream insertion operator for SparkTaskInfo on native side (…
Yohahaha Oct 24, 2023
471bd81
[VL] Remove unnecessary registerConnectorFactory (#3442)
Yohahaha Oct 24, 2023
edf5c50
[VL] Remove the usage of isUnquotedPathCharacter in Velox (#3503)
rui-mo Oct 24, 2023
277d1c4
[GLUTEN-3412][CH]Bug fix element_at 1st argument is map type (#3413)
KevinyhZou Oct 24, 2023
9197780
[VL] Use existing constructor for BigintRange and MultiRange (#3508)
rui-mo Oct 25, 2023
43c6a42
Fix cache output if selectedAttributes has wrong ordering with cacheA…
ulysses-you Oct 25, 2023
778e8c5
[GLUTEN-3512][CH] tolerate empty blocks when native writing (#3513)
exmy Oct 25, 2023
f2ea6ca
[GLUTEN-3511][CORE] Support to add custom aggregate functions for ext…
zzcclp Oct 25, 2023
7cf1a9d
[GLUTEN-1632][CH]Daily Update Clickhouse Version (20231025) (#3518)
lwz9103 Oct 25, 2023
0e867cf
[GLUTEN-3501][CH]Skip null values in to_json function (#3523)
taiyang-li Oct 25, 2023
caf959e
[CORE] Fix convertBroadcastExchangeToColumnar for non-WholeStageCodeg…
liujiayi771 Oct 25, 2023
27e6f84
[GLUTEN-3383][CH] Respect spark config spark.sql.orc.compression.code…
taiyang-li Oct 25, 2023
05650cc
[VL] Adapt to latest rebased velox (10/25) (#3519)
PHILO-HE Oct 25, 2023
dd7eab4
[VL] Fix native cmake include path (#3502)
Yohahaha Oct 26, 2023
9cd88de
[Core] Shuffle the configured local directories for local spills (#3525)
marin-ma Oct 26, 2023
84b56e8
[GLUTEN-3509][CH]Fix partition lock problems in shuffle write
KevinyhZou Oct 26, 2023
02bf40c
[VL] Simplify the conversion from Substrait type to Velox type (#3471)
rui-mo Oct 26, 2023
eb46721
[VL] Apply patch to Velox for compilation (#3520)
rui-mo Oct 26, 2023
d571c1f
[CORE] Simplify hasNext and next functions of iterator (#3490)
rui-mo Oct 26, 2023
2ad99de
[VL] Fix single partitioning evict error (#3522)
marin-ma Oct 26, 2023
77a4b18
[VL] Add EvictGuard to avoid spilling recursively in ShuffleWriter (#…
marin-ma Oct 26, 2023
5038204
[GLUTEN-3516][VL] Fix centos8 build failure when HDFS enabled (#3515)
j7nhai Oct 26, 2023
87372db
[CELEBORN] Optimize the performance of Celeborn's pushPartitionData (…
kerwin-zk Oct 26, 2023
f03d78f
[GLUTEN-3531][VL] Build: Undefined reference while linking velox_plan…
zhztheplayer Oct 26, 2023
8e8d25b
[GLUTEN-3535][CH] Support hive transform dirs recursive (#3536)
loneylee Oct 26, 2023
8fa19ba
[GLUTEN-3534][CH] Fix incorrect logic of judging whether supports pre…
zzcclp Oct 26, 2023
a03e2b3
[GLUTEN-3297][CH] Refactor filter push down framework in gluten, supp…
taiyang-li Oct 26, 2023
d3b91cd
[GLUTEN-1632][CH]Daily Update Clickhouse Version (20231026) (#3530)
lwz9103 Oct 26, 2023
db7f98b
[GLUTEN-3425] [VL] Fix the issue of incorrectly creating an HDFS path…
JkSelf Oct 27, 2023
2085835
[GLUTEN-3538][VL] Build: Unregistered hive connector factory in debug…
zhztheplayer Oct 27, 2023
a21395e
[VL] Refine shuffle writer return types and namespace (#3544)
marin-ma Oct 27, 2023
478e0c1
[CORE] Do not fall back eagerly with one ColumnarToRowExec (#3496)
ulysses-you Oct 27, 2023
a7b68c3
[GLUTEN-1632][CH]Daily Update Clickhouse Version (20231028) (#3555)
lwz9103 Oct 28, 2023
39a5162
[VL] Iterator's and its payloads' lifecycle improvements (#3526)
zhztheplayer Oct 30, 2023
f4c419e
[HOTFIX][CH] Ignore 'test hive parquet_orc table, all columns being p…
zzcclp Oct 30, 2023
599d959
[VL] Use value of spark.io.compression.codec as Velox's spill codec
jinchengchenghh Oct 31, 2023
d6d996c
[GLUTEN-1632][CH]Daily Update Clickhouse Version (20231031) (#3568)
lwz9103 Oct 31, 2023
d0ec931
[VL] Shrink min sized partition buffers and spill (#3265)
marin-ma Oct 31, 2023
f37e9af
[GLUTEN-3359] Add Spark3.4 unit test framework (#3497)
JkSelf Nov 1, 2023
0c59a5d
[CORE] Simplify WholeStageTransformer and BroadcastBuildSideRDD (#3574)
ulysses-you Nov 1, 2023
697b2bd
[VL] Fix an undefined reference issue to adapt to rebased velox (11/0…
PHILO-HE Nov 1, 2023
81fae1e
[CORE] Consider the cost when applying stage fallback policy (#3569)
PHILO-HE Nov 2, 2023
27defd8
[VL] In OAP Velox fork, simplify shrinking/spilling code for Velox me…
zhztheplayer Nov 2, 2023
e597c59
[VL] Support left Spark function (#3588)
PHILO-HE Nov 2, 2023
f82737a
[VL] Add EvictState and EvictGuard in shuffle writer (#3585)
marin-ma Nov 2, 2023
805d909
[GLUTEN-3590][CORE] Reduce driver memory usage by using serialized by…
exmy Nov 2, 2023
78104be
Clear the buffer before serialize (#3578)
JkSelf Nov 2, 2023
5f8591e
[VL] Gluten-it: Fix a trivial cli parameter parsing bug (#3597)
zhztheplayer Nov 3, 2023
55f1480
[VL] Refine setting shuffle writer local directories (#3601)
marin-ma Nov 3, 2023
07ba657
[GLUTEN-3598][CH] Support to config the hash algorithm for the ch shu…
zzcclp Nov 3, 2023
1bacef5
[GLUTEN-3542][CH] Cancel empty string as null representation when rea…
lhuang09287750 Nov 3, 2023
b1d098d
[GLUTEN-1632][CH]Daily Update Clickhouse Version (20231103) (#3605)
lwz9103 Nov 3, 2023
1788834
[VL] Disable columnar table cache by default (#3488)
gaoyangxiaozhu Nov 3, 2023
8ee2bac
[GLUTEN-3602][VL] Sed install_awssdk function to setup-centos8.sh#vel…
dcoliversun Nov 3, 2023
13a0a63
[HOTFIX][CH] Using sparkMurmurHash3_32 function instead of the murmur…
zzcclp Nov 3, 2023
16e2874
[VL] Replace string comparisons with enum for shuffle partitioning (#…
marin-ma Nov 4, 2023
7185955
[VL] Align names of decimal compare functions (#3593)
rui-mo Nov 6, 2023
ba045c7
[CORE] Optimize some methods in agg transformer (#3564)
liujiayi771 Nov 6, 2023
87b3c2e
[CORE] Remove ColumnarAQEShuffleRead (#3607)
ulysses-you Nov 6, 2023
05c7435
[CORE] Add GlutenImplicits to get FallbackSummary easily (#3599)
ulysses-you Nov 6, 2023
bd7abeb
[VL] Refine cmake flags to decrease normal build time (#3485)
Yohahaha Nov 6, 2023
c68e495
[GLUTEN-3521][CH] Fix substring start index can not be 0 (#3524)
KevinyhZou Nov 6, 2023
52552c9
Bug fix csv read field whitespaces (#3554)
KevinyhZou Nov 6, 2023
d57fdf2
Bug fix to_date can not convert 1970-01-01 (#3563)
KevinyhZou Nov 6, 2023
40ef132
[VL] GHA CI: Add cases for aggregation within limited memory (#3622)
zhztheplayer Nov 7, 2023
7308fdb
[CORE] Remove unnecessary case match in getAttrForAggregateExpr (#3629)
liujiayi771 Nov 7, 2023
c470b9b
[CORE] Use collection interface in method parameter and return type (…
liujiayi771 Nov 7, 2023
5d4cac1
[GLUTEN-3141][CH] Rewrite date comparison expression into a more effi…
lgbo-ustc Nov 7, 2023
9c2d057
fix issue 3627 (#3630)
lhuang09287750 Nov 7, 2023
a14baf3
fix issue 3609 (#3614)
lhuang09287750 Nov 7, 2023
e3eff1d
Fix the compile issue when rebasing upstream velox to 11/7 (#3624)
JkSelf Nov 7, 2023
b60fe75
[VL] Fix failed to get iterator exception from file format writer (#3…
JkSelf Nov 8, 2023
427971d
[CORE] Refine maven Spark dependency (#3625)
ulysses-you Nov 8, 2023
2b9891b
[VL] Add find and build glog/gflags to cmake (#3638)
marin-ma Nov 8, 2023
1080543
[GLUTEN-1632][CH]Daily Update Clickhouse Version (20231108) (#3640)
lwz9103 Nov 8, 2023
0473d67
[CH] Add using gperftools doc for CH (#3628)
zhanglistar Nov 8, 2023
0e99bb6
[GLUTEN-3625][FOLLOWUP] Fix IDEA load module issue (#3647)
ulysses-you Nov 8, 2023
4a72871
[VL] Allow users to set bloom filter configurations (#3610)
zhli1142015 Nov 8, 2023
3ec7bed
[GLUTEN-3553][CH] Support bucket scan for ch backend (#3618)
zzcclp Nov 8, 2023
a090556
[GLUTEN-3644][CH] Revert the logic to support the custom aggregate fu…
zzcclp Nov 8, 2023
29b5899
[GLUTEN-3572][VL] Remove --arrow_home option and fix "libarrow not fo…
Surbhi-Vijay Nov 8, 2023
2bb522f
Add duplicated sorting keys validation in TopNNode (#3642)
JkSelf Nov 9, 2023
c7780fe
[CI] Improve CI to auto close stale patch (#3634)
zhouyuan Nov 9, 2023
5ee4e34
[VL] Refactor split buffer allocation (#3177)
marin-ma Nov 9, 2023
451eb66
[VL] Support custom cmake deps download url (#3652)
Yohahaha Nov 9, 2023
fee6529
[GLUTEN-1648][VL] Support max_by/min_by aggregate function (#2336)
Yohahaha Nov 9, 2023
745ecb3
[VL] remove unused libhdfs binaries (#3656)
zhouyuan Nov 9, 2023
33003d4
[GLUTEN-3637][VL] Fix whole stage pipeline metric (#3661)
Yohahaha Nov 10, 2023
3ecf596
Fix the duplicated key exception in TopN (#3655)
JkSelf Nov 10, 2023
047c993
[GLUTEN-3625][FOLLOWUP] Recover to the use of hardcode module name (#…
ulysses-you Nov 10, 2023
337b9d9
[VL] Fix for a corner case may break bhj + exchange validation (#3595)
zhztheplayer Nov 10, 2023
6be077d
[CH] Use `max_block_size` controller read mergetree block size (#3669)
loneylee Nov 10, 2023
bd483c9
[GLUTEN-3668][CH] Performance regresses seriously after PR 3169 merge…
zzcclp Nov 10, 2023
797770d
[CORE] Fix ExpandExecTransformer miss input (#3665)
ulysses-you Nov 10, 2023
3a64f2e
[CI] bump checkout action to v4 (#3681)
zhouyuan Nov 11, 2023
5b632f6
[GLUTEN-1632][CH]Daily Update Clickhouse Version (20231112) (#3683)
lwz9103 Nov 12, 2023
19800f9
[Gluten-core][VL] Support RewriteTransformer Rules and DeltaLake 2.2 …
YannByron Nov 13, 2023
1f6c6c3
[CORE] Deprecate RegularMemoryConsumer, use TreeMemoryConsumer for bo…
zhztheplayer Nov 13, 2023
139c8a1
[VL] Fix shuffle ignores uncompressed compression configuration (#3672)
marin-ma Nov 13, 2023
70061ac
[GLUTEN-3670][CH] Support escape with excel format (#3674)
loneylee Nov 13, 2023
7cf715b
[VL] Declare IntermediateTypes for specific agg function (#3679)
liujiayi771 Nov 13, 2023
5dabb5b
[VL] Enable Spark functions for `translate`, `add_months`, `array_min…
PHILO-HE Nov 14, 2023
f7d6386
[VL] bump faster xml version to 2.13.5 (#3688)
zhouyuan Nov 14, 2023
0f9f196
[VL] Fix validate issues for Generate and validate CaseWhen expressio…
jackylee-ch Nov 14, 2023
6c38a76
fix issue 3689 (#3691)
lhuang09287750 Nov 14, 2023
bc25d07
[VL] use velox config lib to extract the configurations from spark (#…
FelixYBW Nov 14, 2023
3e42cb0
[GLUTEN-1632][CH]Daily Update Clickhouse Version (20231114) (#3698)
lwz9103 Nov 14, 2023
31e354f
[GLUTEN-3705][CORE] Support mapping one custom aggregate function to …
zzcclp Nov 15, 2023
4713b47
Add doc using jemalloc with ch backend. (#3725)
zhanglistar Nov 15, 2023
2449443
[CH] Fix nativeBufferSize assign bug (#3717)
zhanglistar Nov 15, 2023
857872e
[MINOR] Code refactor for ColumnarShuffleExchange constructing #3720
PHILO-HE Nov 15, 2023
36c9208
[GLUTEN-3711][VL] Fix printConf to Support Velox Config #3710
ted-jenks Nov 15, 2023
b2c62cd
[GLUTEN-3635][VL] Clean up docs related to `build-arrow` paths (#3709)
ted-jenks Nov 15, 2023
808e091
[HOTFIX][CH] Fix randomly failed ut for PR#3537 (#3714)
zzcclp Nov 16, 2023
c5ae59d
[GLUTEN-3705][FOLLOW][CH] Set the correct agg schema names after mapp…
zzcclp Nov 16, 2023
8abe4d3
[VL] Fix validation runtime handle leak (#3730)
ulysses-you Nov 16, 2023
fea2222
[VL] rebase velox to 20231114 (#3735)
FelixYBW Nov 16, 2023
a42bca9
[VL][DOC] Move TPC-H/DS workload to tools/ and fix doc link (#3737)
PHILO-HE Nov 16, 2023
aab6dbe
[CH] Optimize aggregate state serialization performance (#3279)
liuneng1994 Nov 16, 2023
6c9c81b
[GLUTEN-1902] Separate the implementations for TranslateTransformer (…
PHILO-HE Nov 16, 2023
510b228
[CI] increase operations per run for stale action (#3699)
zhouyuan Nov 16, 2023
efbe2f3
[CORE] Remove redundant case match for HiveTableScanExecTransformer i…
exmy Nov 17, 2023
5db72cf
reduce libch.so size (#3746)
zhanglistar Nov 17, 2023
4df8832
[CORE] set default spark.version to 3.4.1 (#3727)
zhouyuan Nov 17, 2023
9956fba
[GLUTEN-3548][CH] Bug fix csv file cr at end of line (#3549)
KevinyhZou Nov 17, 2023
f21a96f
[GLUTEN-3692][CH]Fix count no result on empty table (#3693)
KevinyhZou Nov 17, 2023
cb4e7b7
[VL] rebase to velox 2023-11-14 (#3747)
FelixYBW Nov 17, 2023
a662f3f
[VL]Avoid unnecessary filter binding for subfield (#3300)
yma11 Nov 17, 2023
49b8e06
[GLUTEN-3378][CORE] Move getLocalFilesNode logic to scan transformer …
liujiayi771 Nov 17, 2023
23c2e4c
[GLUTEN-3739][VL] Add a config to control velox's file handle cache (…
zhli1142015 Nov 17, 2023
ad23092
[GLUTEN-3668][CH]Fix to_date function performance (#3701)
KevinyhZou Nov 17, 2023
bb88f4f
[GLUTEN-1632][CH]Daily Update Clickhouse Version (20231117) (#3756)
lwz9103 Nov 17, 2023
5f5d18a
[VL] Enable spill-to-disk for partial aggregation (#3697)
zhztheplayer Nov 18, 2023
48496c0
[GLUTEN-3749][VL] fix redundant Velox build (#3759)
zhouyuan Nov 20, 2023
dc75cca
[VL] Activate random kill tasks GHA CI job (#3761)
zhztheplayer Nov 20, 2023
cb18fc1
[CELEBORN] Fix push small data (#3766)
exmy Nov 20, 2023
7553cd1
[VL] [Minor] Fix compile error in debug mode (#3765)
JkSelf Nov 20, 2023
6983898
[CORE] Support get native plan tree string (#3729)
ulysses-you Nov 20, 2023
60fc2a0
[GLUTEN-3779][CH] Fix core dump when executing sql with runtime filte…
zzcclp Nov 20, 2023
92356bc
Add config to specify the window type in velox backend (#3703)
JkSelf Nov 21, 2023
1b489d1
[GLUTEN-3715] [VL] Add GCS support in velox backend (#2634)
tigrux Nov 21, 2023
b22c862
[VL] update pacakge.sh for spark34 (#3786)
zhouyuan Nov 21, 2023
018da4c
[VL] Respect spark.gluten.sql.debug in native side (#3748)
Yohahaha Nov 21, 2023
f29077e
[GLUTEN-3719][VL] Introduce VeloxIntermediateData to adjust agg func …
liujiayi771 Nov 21, 2023
7c640bd
Make debug behavior clear (#3793)
ulysses-you Nov 21, 2023
0d4e43c
[VL] Fix parquet writer passing wrong param (#3790)
marin-ma Nov 21, 2023
30865e5
[VL] Ban flaky unit tests (#3798)
zhouyuan Nov 21, 2023
b40a5f0
[CORE][VL] Add naitve plan string and plan with stats (#3787)
ulysses-you Nov 21, 2023
5966579
[VL] Add configuration for generating 4k window size gzip parquet fil…
marin-ma Nov 22, 2023
fc04d0c
[VL] Fix ORC related failed UT
chenxu14 Nov 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[VL] Copy-compress small partition buffer (#3434)
marin-ma authored Oct 19, 2023
commit 048d981e1cee649203765983a8af63bfd8234027
1 change: 1 addition & 0 deletions cpp/core/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -194,6 +194,7 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS
memory/ArrowMemoryPool.cc
memory/ColumnarBatch.cc
operators/writer/ArrowWriter.cc
shuffle/options.cc
shuffle/ShuffleReader.cc
shuffle/ShuffleWriter.cc
shuffle/Partitioner.cc
2 changes: 1 addition & 1 deletion cpp/core/compute/ExecutionCtx.h
Original file line number Diff line number Diff line change
@@ -138,7 +138,7 @@ class ExecutionCtx : public std::enable_shared_from_this<ExecutionCtx> {

virtual ResourceHandle createShuffleReader(
std::shared_ptr<arrow::Schema> schema,
ReaderOptions options,
ShuffleReaderOptions options,
arrow::MemoryPool* pool,
MemoryManager* memoryManager) = 0;
virtual std::shared_ptr<ShuffleReader> getShuffleReader(ResourceHandle) = 0;
8 changes: 3 additions & 5 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
@@ -810,7 +810,7 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper
}

shuffleWriterOptions.task_attempt_id = (int64_t)taskAttemptId;
shuffleWriterOptions.buffer_compress_threshold = bufferCompressThreshold;
shuffleWriterOptions.compression_threshold = bufferCompressThreshold;

auto partitionWriterTypeC = env->GetStringUTFChars(partitionWriterTypeJstr, JNI_FALSE);
auto partitionWriterType = std::string(partitionWriterTypeC);
@@ -984,20 +984,18 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleReaderJniWrapper
jlong cSchema,
jlong memoryManagerHandle,
jstring compressionType,
jstring compressionBackend,
jstring compressionMode) {
jstring compressionBackend) {
JNI_METHOD_START
auto ctx = gluten::getExecutionCtx(env, wrapper);
auto memoryManager = jniCastOrThrow<MemoryManager>(memoryManagerHandle);

auto pool = memoryManager->getArrowMemoryPool();
ReaderOptions options = ReaderOptions::defaults();
ShuffleReaderOptions options = ShuffleReaderOptions::defaults();
options.ipc_read_options.memory_pool = pool;
options.ipc_read_options.use_threads = false;
if (compressionType != nullptr) {
options.compression_type = getCompressionType(env, compressionType);
options.codec_backend = getCodecBackend(env, compressionBackend);
options.compression_mode = getCompressionMode(env, compressionMode);
}
std::shared_ptr<arrow::Schema> schema =
gluten::arrowGetOrThrow(arrow::ImportSchema(reinterpret_cast<struct ArrowSchema*>(cSchema)));
1 change: 1 addition & 0 deletions cpp/core/shuffle/PartitionWriter.h
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
#pragma once

#include "shuffle/ShuffleWriter.h"
#include "shuffle/options.h"

namespace gluten {

13 changes: 6 additions & 7 deletions cpp/core/shuffle/ShuffleReader.cc
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ using namespace gluten;
class ShuffleReaderOutStream : public ColumnarBatchIterator {
public:
ShuffleReaderOutStream(
const ReaderOptions& options,
const ShuffleReaderOptions& options,
const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::io::InputStream>& in,
const std::function<void(int64_t)> ipcTimeAccumulator)
@@ -65,7 +65,7 @@ class ShuffleReaderOutStream : public ColumnarBatchIterator {
}

private:
ReaderOptions options_;
ShuffleReaderOptions options_;
std::shared_ptr<arrow::io::InputStream> in_;
std::function<void(int64_t)> ipcTimeAccumulator_;
std::shared_ptr<arrow::Schema> writeSchema_;
@@ -74,11 +74,10 @@ class ShuffleReaderOutStream : public ColumnarBatchIterator {

namespace gluten {

ReaderOptions ReaderOptions::defaults() {
return {};
}

ShuffleReader::ShuffleReader(std::shared_ptr<arrow::Schema> schema, ReaderOptions options, arrow::MemoryPool* pool)
ShuffleReader::ShuffleReader(
std::shared_ptr<arrow::Schema> schema,
ShuffleReaderOptions options,
arrow::MemoryPool* pool)
: pool_(pool), options_(std::move(options)), schema_(schema) {}

std::shared_ptr<ResultIterator> ShuffleReader::readStream(std::shared_ptr<arrow::io::InputStream> in) {
14 changes: 3 additions & 11 deletions cpp/core/shuffle/ShuffleReader.h
Original file line number Diff line number Diff line change
@@ -23,22 +23,14 @@
#include <arrow/ipc/options.h>

#include "compute/ResultIterator.h"
#include "options.h"
#include "utils/compression.h"

namespace gluten {

struct ReaderOptions {
arrow::ipc::IpcReadOptions ipc_read_options = arrow::ipc::IpcReadOptions::Defaults();
arrow::Compression::type compression_type = arrow::Compression::type::LZ4_FRAME;
CodecBackend codec_backend = CodecBackend::NONE;
CompressionMode compression_mode = CompressionMode::BUFFER;

static ReaderOptions defaults();
};

class ShuffleReader {
public:
explicit ShuffleReader(std::shared_ptr<arrow::Schema> schema, ReaderOptions options, arrow::MemoryPool* pool);
explicit ShuffleReader(std::shared_ptr<arrow::Schema> schema, ShuffleReaderOptions options, arrow::MemoryPool* pool);

virtual ~ShuffleReader() = default;

@@ -67,7 +59,7 @@ class ShuffleReader {
int64_t ipcTime_ = 0;
int64_t deserializeTime_ = 0;

ReaderOptions options_;
ShuffleReaderOptions options_;

private:
std::shared_ptr<arrow::Schema> schema_;
2 changes: 1 addition & 1 deletion cpp/core/shuffle/ShuffleSchema.h
Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@ inline std::shared_ptr<arrow::Schema> toWriteSchema(arrow::Schema& schema) {
case arrow::BinaryType::type_id:
case arrow::StringType::type_id: {
fields.emplace_back(std::make_shared<arrow::Field>("nullBuffer" + std::to_string(i), arrow::large_utf8()));
fields.emplace_back(std::make_shared<arrow::Field>("offsetBuffer" + std::to_string(i), arrow::large_utf8()));
fields.emplace_back(std::make_shared<arrow::Field>("lengthBuffer" + std::to_string(i), arrow::large_utf8()));
fields.emplace_back(std::make_shared<arrow::Field>("valueBuffer" + std::to_string(i), arrow::large_utf8()));
} break;
case arrow::StructType::type_id:
4 changes: 0 additions & 4 deletions cpp/core/shuffle/ShuffleWriter.cc
Original file line number Diff line number Diff line change
@@ -31,10 +31,6 @@ namespace gluten {
#define SPLIT_BUFFER_SIZE 16 * 1024 * 1024
#endif

ShuffleWriterOptions ShuffleWriterOptions::defaults() {
return {};
}

std::shared_ptr<arrow::Schema> ShuffleWriter::writeSchema() {
if (writeSchema_ != nullptr) {
return writeSchema_;
38 changes: 1 addition & 37 deletions cpp/core/shuffle/ShuffleWriter.h
Original file line number Diff line number Diff line change
@@ -23,47 +23,11 @@

#include "memory/ArrowMemoryPool.h"
#include "memory/ColumnarBatch.h"
#include "shuffle/options.h"
#include "utils/compression.h"

namespace gluten {

namespace {
static constexpr int32_t kDefaultShuffleWriterBufferSize = 4096;
static constexpr int32_t kDefaultNumSubDirs = 64;
static constexpr int32_t kDefaultBufferCompressThreshold = 1024;
static constexpr int32_t kDefaultBufferAlignment = 64;
static constexpr double kDefaultBufferReallocThreshold = 0.25;
} // namespace

enum PartitionWriterType { kLocal, kCeleborn };

struct ShuffleWriterOptions {
int32_t buffer_size = kDefaultShuffleWriterBufferSize;
int32_t push_buffer_max_size = kDefaultShuffleWriterBufferSize;
int32_t num_sub_dirs = kDefaultNumSubDirs;
int32_t buffer_compress_threshold = kDefaultBufferCompressThreshold;
double buffer_realloc_threshold = kDefaultBufferReallocThreshold;
arrow::Compression::type compression_type = arrow::Compression::LZ4_FRAME;
CodecBackend codec_backend = CodecBackend::NONE;
CompressionMode compression_mode = CompressionMode::BUFFER;
bool buffered_write = false;
bool write_eos = true;

std::string data_file;
PartitionWriterType partition_writer_type = kLocal;

int64_t thread_id = -1;
int64_t task_attempt_id = -1;

arrow::MemoryPool* memory_pool;

arrow::ipc::IpcWriteOptions ipc_write_options = arrow::ipc::IpcWriteOptions::Defaults();

std::string partitioning_name;

static ShuffleWriterOptions defaults();
};

class ShuffleMemoryPool : public arrow::MemoryPool {
public:
ShuffleMemoryPool(arrow::MemoryPool* pool) : pool_(pool) {}
13 changes: 0 additions & 13 deletions cpp/core/shuffle/Utils.h
Original file line number Diff line number Diff line change
@@ -30,19 +30,6 @@ namespace gluten {

const std::string kGlutenSparkLocalDirs = "GLUTEN_SPARK_LOCAL_DIRS";

#define EVAL_START(name, thread_id) \
// auto eval_start = std::chrono::duration_cast<std::chrono::nanoseconds>( \
std::chrono::system_clock::now().time_since_epoch()) \
.count();

#define EVAL_END(name, thread_id, task_attempt_id) \
// std::cout << "xgbtck " << name << " " << eval_start << " " \
<< std::chrono::duration_cast<std::chrono::nanoseconds>( \
std::chrono::system_clock::now().time_since_epoch()) \
.count() - \
eval_start \
<< " " << thread_id << " " << task_attempt_id << std::endl;

std::string generateUuid();

std::string getSpilledShuffleFileDir(const std::string& configuredDir, int32_t subDirId);
25 changes: 25 additions & 0 deletions cpp/core/shuffle/options.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "options.h"
gluten::ShuffleReaderOptions gluten::ShuffleReaderOptions::defaults() {
return {};
}

gluten::ShuffleWriterOptions gluten::ShuffleWriterOptions::defaults() {
return {};
}
68 changes: 68 additions & 0 deletions cpp/core/shuffle/options.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <arrow/ipc/options.h>
#include "utils/compression.h"

namespace gluten {

static constexpr int32_t kDefaultShuffleWriterBufferSize = 4096;
static constexpr int32_t kDefaultNumSubDirs = 64;
static constexpr int32_t kDefaultCompressionThreshold = 100;
static constexpr int32_t kDefaultBufferAlignment = 64;
static constexpr double kDefaultBufferReallocThreshold = 0.25;

enum PartitionWriterType { kLocal, kCeleborn };

struct ShuffleReaderOptions {
arrow::ipc::IpcReadOptions ipc_read_options = arrow::ipc::IpcReadOptions::Defaults();
arrow::Compression::type compression_type = arrow::Compression::type::LZ4_FRAME;
CodecBackend codec_backend = CodecBackend::NONE;

static ShuffleReaderOptions defaults();
};

struct ShuffleWriterOptions {
int32_t buffer_size = kDefaultShuffleWriterBufferSize;
int32_t push_buffer_max_size = kDefaultShuffleWriterBufferSize;
int32_t num_sub_dirs = kDefaultNumSubDirs;
int32_t compression_threshold = kDefaultCompressionThreshold;
double buffer_realloc_threshold = kDefaultBufferReallocThreshold;
arrow::Compression::type compression_type = arrow::Compression::LZ4_FRAME;
CodecBackend codec_backend = CodecBackend::NONE;
CompressionMode compression_mode = CompressionMode::BUFFER;
bool buffered_write = false;
bool write_eos = true;

std::string data_file;
PartitionWriterType partition_writer_type = kLocal;

int64_t thread_id = -1;
int64_t task_attempt_id = -1;

arrow::MemoryPool* memory_pool;

arrow::ipc::IpcWriteOptions ipc_write_options = arrow::ipc::IpcWriteOptions::Defaults();

std::string partitioning_name;

static ShuffleWriterOptions defaults();
};

} // namespace gluten
2 changes: 1 addition & 1 deletion cpp/velox/compute/VeloxExecutionCtx.cc
Original file line number Diff line number Diff line change
@@ -219,7 +219,7 @@ void VeloxExecutionCtx::releaseDatasource(ResourceHandle handle) {

ResourceHandle VeloxExecutionCtx::createShuffleReader(
std::shared_ptr<arrow::Schema> schema,
ReaderOptions options,
ShuffleReaderOptions options,
arrow::MemoryPool* pool,
MemoryManager* memoryManager) {
auto ctxVeloxPool = getLeafVeloxPool(memoryManager);
2 changes: 1 addition & 1 deletion cpp/velox/compute/VeloxExecutionCtx.h
Original file line number Diff line number Diff line change
@@ -107,7 +107,7 @@ class VeloxExecutionCtx final : public ExecutionCtx {

ResourceHandle createShuffleReader(
std::shared_ptr<arrow::Schema> schema,
ReaderOptions options,
ShuffleReaderOptions options,
arrow::MemoryPool* pool,
MemoryManager* memoryManager) override;
std::shared_ptr<ShuffleReader> getShuffleReader(ResourceHandle handle) override;
Loading