Skip to content

Commit

Permalink
save hash mem code
Browse files Browse the repository at this point in the history
  • Loading branch information
HappenLee committed Sep 30, 2024
1 parent b6b8e4f commit 083b996
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 3 deletions.
28 changes: 28 additions & 0 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,17 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) {
Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_status) {
auto p = _parent->cast<HashJoinBuildSinkOperatorX>();
Defer defer {[&]() {
if (_should_build_hash_table) {
// The build side hash key column maybe no need output, but we need to keep the column in block
// because it is used to compare with probe side hash key column
if (p._should_keep_hash_key_column && _build_col_ids.size() == 1) {
p._should_keep_column_flags[_build_col_ids[0]] = true;
}
// release the memory of unused column in probe stage
_shared_state->build_block->clear_column_mem_not_keep(p._should_keep_column_flags,
p._shared_hash_table_context);
}

if (_should_build_hash_table && p._shared_hashtable_controller) {
p._shared_hashtable_controller->signal_finish(p.node_id());
}
Expand Down Expand Up @@ -386,7 +397,9 @@ void HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) {
default:
_shared_state->hash_table_variants
->emplace<vectorized::SerializedHashTableContext>();
return;
}
p._should_keep_hash_key_column = true;
return;
}

Expand Down Expand Up @@ -433,6 +446,10 @@ Status HashJoinBuildSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* st
RETURN_IF_ERROR(JoinBuildSinkOperatorX::init(tnode, state));
DCHECK(tnode.__isset.hash_join_node);

if (tnode.hash_join_node.__isset.hash_output_slot_ids) {
_hash_output_slot_ids = tnode.hash_join_node.hash_output_slot_ids;
}

const bool build_stores_null = _join_op == TJoinOp::RIGHT_OUTER_JOIN ||
_join_op == TJoinOp::FULL_OUTER_JOIN ||
_join_op == TJoinOp::RIGHT_ANTI_JOIN;
Expand Down Expand Up @@ -494,6 +511,17 @@ Status HashJoinBuildSinkOperatorX::open(RuntimeState* state) {
_shared_hash_table_context = _shared_hashtable_controller->get_context(node_id());
}
}
auto init_keep_column_flags = [&](auto& tuple_descs, auto& output_slot_flags) {
for (const auto& tuple_desc : tuple_descs) {
for (const auto& slot_desc : tuple_desc->slots()) {
output_slot_flags.emplace_back(
_hash_output_slot_ids.empty() ||
std::find(_hash_output_slot_ids.begin(), _hash_output_slot_ids.end(),
slot_desc->id()) != _hash_output_slot_ids.end());
}
}
};
init_keep_column_flags(row_desc().tuple_descriptors(), _should_keep_column_flags);
RETURN_IF_ERROR(vectorized::VExpr::prepare(_build_expr_ctxs, state, _child->row_desc()));
return vectorized::VExpr::open(_build_expr_ctxs, state);
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ class HashJoinBuildSinkOperatorX final
const std::vector<TExpr> _partition_exprs;

const bool _need_local_merge;

std::vector<SlotId> _hash_output_slot_ids;
std::vector<bool> _should_keep_column_flags;
bool _should_keep_hash_key_column = false;
};

template <class HashTableContext>
Expand Down
10 changes: 7 additions & 3 deletions be/src/pipeline/exec/join/process_hash_table_probe_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -640,9 +640,13 @@ Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(
mcol.size(), _right_col_len, _right_col_idx);
}
for (size_t j = 0; j < _right_col_len; ++j) {
const auto& column = *_build_block->safe_get_by_position(j).column;
mcol[j + _right_col_idx]->insert_indices_from(column, _build_indexs.data(),
_build_indexs.data() + block_size);
if (_right_output_slot_flags->at(j)) {
const auto& column = *_build_block->safe_get_by_position(j).column;
mcol[j + _right_col_idx]->insert_indices_from(column, _build_indexs.data(),
_build_indexs.data() + block_size);
} else {
mcol[j + _right_col_idx]->resize(block_size);
}
}

// just resize the left table column in case with other conjunct to make block size is not zero
Expand Down
15 changes: 15 additions & 0 deletions be/src/vec/core/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,21 @@ void Block::erase_tmp_columns() noexcept {
}
}

void Block::clear_column_mem_not_keep(const std::vector<bool>& column_keep_flags,
bool need_keep_first) {
if (data.size() >= column_keep_flags.size()) auto origin_rows = rows();
for (size_t i = 0; i < column_keep_flags.size(); ++i) {
if (!column_keep_flags[i]) {
data[i].column = data[i].column->clone_empty();
}
}

if (need_keep_first) {
data[0].column->resize(origin_rows);
}
}
}

void Block::swap(Block& other) noexcept {
SCOPED_SKIP_MEMORY_CHECK();
data.swap(other.data);
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/core/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,9 @@ class Block {
// we built some temporary columns into block
void erase_tmp_columns() noexcept;

void clear_column_mem_not_keep(const std::vector<bool>& column_keep_flags,
bool need_keep_first);

private:
void erase_impl(size_t position);
};
Expand Down

0 comments on commit 083b996

Please sign in to comment.