diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 8f7b176a979a4db..a2610d1e8298e58 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -113,6 +113,17 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) { Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_status) { auto p = _parent->cast(); Defer defer {[&]() { + if (_should_build_hash_table) { + // The build side hash key column maybe no need output, but we need to keep the column in block + // because it is used to compare with probe side hash key column + if (p._should_keep_hash_key_column && _build_col_ids.size() == 1) { + p._should_keep_column_flags[_build_col_ids[0]] = true; + } + // release the memory of unused column in probe stage + _shared_state->build_block->clear_column_mem_not_keep(p._should_keep_column_flags, + p._shared_hash_table_context); + } + if (_should_build_hash_table && p._shared_hashtable_controller) { p._shared_hashtable_controller->signal_finish(p.node_id()); } @@ -386,7 +397,9 @@ void HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) { default: _shared_state->hash_table_variants ->emplace(); + return; } + p._should_keep_hash_key_column = true; return; } @@ -433,6 +446,10 @@ Status HashJoinBuildSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* st RETURN_IF_ERROR(JoinBuildSinkOperatorX::init(tnode, state)); DCHECK(tnode.__isset.hash_join_node); + if (tnode.hash_join_node.__isset.hash_output_slot_ids) { + _hash_output_slot_ids = tnode.hash_join_node.hash_output_slot_ids; + } + const bool build_stores_null = _join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN || _join_op == TJoinOp::RIGHT_ANTI_JOIN; @@ -494,6 +511,17 @@ Status HashJoinBuildSinkOperatorX::open(RuntimeState* state) { _shared_hash_table_context = _shared_hashtable_controller->get_context(node_id()); } } + auto init_keep_column_flags = [&](auto& tuple_descs, auto& output_slot_flags) { + for (const auto& tuple_desc : tuple_descs) { + for (const auto& slot_desc : tuple_desc->slots()) { + output_slot_flags.emplace_back( + _hash_output_slot_ids.empty() || + std::find(_hash_output_slot_ids.begin(), _hash_output_slot_ids.end(), + slot_desc->id()) != _hash_output_slot_ids.end()); + } + } + }; + init_keep_column_flags(row_desc().tuple_descriptors(), _should_keep_column_flags); RETURN_IF_ERROR(vectorized::VExpr::prepare(_build_expr_ctxs, state, _child->row_desc())); return vectorized::VExpr::open(_build_expr_ctxs, state); } diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index cf677833fb5b64e..a544cdcf4563a49 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -173,6 +173,10 @@ class HashJoinBuildSinkOperatorX final const std::vector _partition_exprs; const bool _need_local_merge; + + std::vector _hash_output_slot_ids; + std::vector _should_keep_column_flags; + bool _should_keep_hash_key_column = false; }; template diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h index 653cc8ab4473dd1..6bb5a2006ab9b01 100644 --- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h +++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h @@ -640,9 +640,13 @@ Status ProcessHashTableProbe::process_data_in_hashtable( mcol.size(), _right_col_len, _right_col_idx); } for (size_t j = 0; j < _right_col_len; ++j) { - const auto& column = *_build_block->safe_get_by_position(j).column; - mcol[j + _right_col_idx]->insert_indices_from(column, _build_indexs.data(), - _build_indexs.data() + block_size); + if (_right_output_slot_flags->at(j)) { + const auto& column = *_build_block->safe_get_by_position(j).column; + mcol[j + _right_col_idx]->insert_indices_from(column, _build_indexs.data(), + _build_indexs.data() + block_size); + } else { + mcol[j + _right_col_idx]->resize(block_size); + } } // just resize the left table column in case with other conjunct to make block size is not zero diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 971e48d52b6d54c..a6b314b817fea62 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -749,6 +749,21 @@ void Block::erase_tmp_columns() noexcept { } } +void Block::clear_column_mem_not_keep(const std::vector& column_keep_flags, + bool need_keep_first) { + if (data.size() >= column_keep_flags.size()) auto origin_rows = rows(); + for (size_t i = 0; i < column_keep_flags.size(); ++i) { + if (!column_keep_flags[i]) { + data[i].column = data[i].column->clone_empty(); + } + } + + if (need_keep_first) { + data[0].column->resize(origin_rows); + } +} +} + void Block::swap(Block& other) noexcept { SCOPED_SKIP_MEMORY_CHECK(); data.swap(other.data); diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index 108cb5e1c9fdf55..c082b731dd928f0 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -404,6 +404,9 @@ class Block { // we built some temporary columns into block void erase_tmp_columns() noexcept; + void clear_column_mem_not_keep(const std::vector& column_keep_flags, + bool need_keep_first); + private: void erase_impl(size_t position); };