Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

save hash mem code #41388

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,20 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) {
Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_status) {
auto p = _parent->cast<HashJoinBuildSinkOperatorX>();
Defer defer {[&]() {
if (_should_build_hash_table && !p._shared_hashtable_controller) {
// The build side hash key column maybe no need output, but we need to keep the column in block
// because it is used to compare with probe side hash key column
if (p._should_keep_hash_key_column && _build_col_ids.size() == 1) {
p._should_keep_column_flags[_build_col_ids[0]] = true;
}

if (_shared_state->build_block) {
// release the memory of unused column in probe stage
_shared_state->build_block->clear_column_mem_not_keep(
p._should_keep_column_flags, true);
}
}

if (_should_build_hash_table && p._shared_hashtable_controller) {
p._shared_hashtable_controller->signal_finish(p.node_id());
}
Expand Down Expand Up @@ -386,7 +400,9 @@ void HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) {
default:
_shared_state->hash_table_variants
->emplace<vectorized::SerializedHashTableContext>();
return;
}
p._should_keep_hash_key_column = true;
return;
}

Expand Down Expand Up @@ -433,6 +449,10 @@ Status HashJoinBuildSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* st
RETURN_IF_ERROR(JoinBuildSinkOperatorX::init(tnode, state));
DCHECK(tnode.__isset.hash_join_node);

if (tnode.hash_join_node.__isset.hash_output_slot_ids) {
_hash_output_slot_ids = tnode.hash_join_node.hash_output_slot_ids;
}

const bool build_stores_null = _join_op == TJoinOp::RIGHT_OUTER_JOIN ||
_join_op == TJoinOp::FULL_OUTER_JOIN ||
_join_op == TJoinOp::RIGHT_ANTI_JOIN;
Expand Down Expand Up @@ -494,6 +514,17 @@ Status HashJoinBuildSinkOperatorX::open(RuntimeState* state) {
_shared_hash_table_context = _shared_hashtable_controller->get_context(node_id());
}
}
auto init_keep_column_flags = [&](auto& tuple_descs, auto& output_slot_flags) {
for (const auto& tuple_desc : tuple_descs) {
for (const auto& slot_desc : tuple_desc->slots()) {
output_slot_flags.emplace_back(
_hash_output_slot_ids.empty() ||
std::find(_hash_output_slot_ids.begin(), _hash_output_slot_ids.end(),
slot_desc->id()) != _hash_output_slot_ids.end());
}
}
};
init_keep_column_flags(row_desc().tuple_descriptors(), _should_keep_column_flags);
RETURN_IF_ERROR(vectorized::VExpr::prepare(_build_expr_ctxs, state, _child->row_desc()));
return vectorized::VExpr::open(_build_expr_ctxs, state);
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ class HashJoinBuildSinkOperatorX final
const std::vector<TExpr> _partition_exprs;

const bool _need_local_merge;

std::vector<SlotId> _hash_output_slot_ids;
std::vector<bool> _should_keep_column_flags;
bool _should_keep_hash_key_column = false;
};

template <class HashTableContext>
Expand Down
10 changes: 7 additions & 3 deletions be/src/pipeline/exec/join/process_hash_table_probe_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -640,9 +640,13 @@ Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(
mcol.size(), _right_col_len, _right_col_idx);
}
for (size_t j = 0; j < _right_col_len; ++j) {
const auto& column = *_build_block->safe_get_by_position(j).column;
mcol[j + _right_col_idx]->insert_indices_from(column, _build_indexs.data(),
_build_indexs.data() + block_size);
if (_right_output_slot_flags->at(j)) {
const auto& column = *_build_block->safe_get_by_position(j).column;
mcol[j + _right_col_idx]->insert_indices_from(column, _build_indexs.data(),
_build_indexs.data() + block_size);
} else {
mcol[j + _right_col_idx]->resize(block_size);
}
}

// just resize the left table column in case with other conjunct to make block size is not zero
Expand Down
18 changes: 18 additions & 0 deletions be/src/vec/core/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,24 @@ void Block::erase_tmp_columns() noexcept {
}
}

void Block::clear_column_mem_not_keep(const std::vector<bool>& column_keep_flags,
bool need_keep_first) {
if (data.size() >= column_keep_flags.size()) {
auto origin_rows = rows();
for (size_t i = 0; i < column_keep_flags.size(); ++i) {
if (!column_keep_flags[i]) {
data[i].column = data[i].column->clone_empty();
}
}

if (need_keep_first && data[0].column->size() != origin_rows) {
auto first_column = data[0].column->clone_empty();
first_column->resize(origin_rows);
data[0].column = std::move(first_column);
}
}
}

void Block::swap(Block& other) noexcept {
SCOPED_SKIP_MEMORY_CHECK();
data.swap(other.data);
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/core/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,9 @@ class Block {
// we built some temporary columns into block
void erase_tmp_columns() noexcept;

void clear_column_mem_not_keep(const std::vector<bool>& column_keep_flags,
bool need_keep_first);

private:
void erase_impl(size_t position);
};
Expand Down
Loading