Skip to content

Commit

Permalink
Reserve space for hash table [cmu-db#3]
Browse files Browse the repository at this point in the history
  • Loading branch information
eric-haibin-lin committed Dec 2, 2016
1 parent 9320574 commit b1d65e1
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 45 deletions.
2 changes: 2 additions & 0 deletions src/include/executor/parallel_hash_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ class ParallelHashExecutor : public AbstractExecutor, public Trackable {
total_num_tuples_.fetch_add(num_tuples);
}

inline void Reserve(size_t num_tuples) { hash_table_.reserve(num_tuples); }

protected:
// Initialize the values of the hash keys from plan node
void InitHashKeys();
Expand Down
19 changes: 11 additions & 8 deletions src/planner/parallel_hash_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@ ParallelHashPlan::DependencyCompleteHelper(
num_partitions = 1;
}

size_t total_num_tuples = 0;

// Group the results based on partitions
executor::LogicalTileLists partitioned_result_tile_lists(num_partitions);
for (auto &result_tile_list : *(task->result_tile_lists.get())) {
for (auto &result_tile : result_tile_list) {
total_num_tuples += result_tile->GetTupleCount();
size_t partition = result_tile->GetPartition();
if (force_single_partition) {
partition = 0;
Expand Down Expand Up @@ -84,9 +87,12 @@ ParallelHashPlan::DependencyCompleteHelper(
std::shared_ptr<executor::ParallelHashExecutor> hash_executor(
new executor::ParallelHashExecutor(this, nullptr));
hash_executor->SetNumTasks(num_tasks);
// Reserve space for hash table
hash_executor->Reserve(total_num_tuples);
hash_executor->Init();
LOG_DEBUG("Number of tuples from child: %d", (int)total_num_tuples);

// TODO Add dummy child node to retrieve result from
// TODO Add dummy child node to retrieve result from?
// hash_executor.AddChild(&right_table_scan_executor);

for (size_t task_id = 0; task_id < num_tasks; task_id++) {
Expand All @@ -112,15 +118,12 @@ ParallelHashPlan::DependencyCompleteHelper(
for (auto task : tasks) {
executor::HashTask *hash_task =
static_cast<executor::HashTask *>(task.get());
if (force_single_partition) {
executor::ParallelHashExecutor::ExecuteTask(task);
} else {
partitioned_executor_thread_pool.SubmitTask(
hash_task->partition_id, executor::ParallelHashExecutor::ExecuteTask,
std::move(task));
}
partitioned_executor_thread_pool.SubmitTask(
hash_task->partition_id, executor::ParallelHashExecutor::ExecuteTask,
std::move(task));
}

LOG_DEBUG("%d hash tasks submitted", (int)tasks.size());
// XXX This is a hack to let join test pass
hash_executor->SetChildTiles(result_tile_lists);
return std::move(hash_executor);
Expand Down
59 changes: 22 additions & 37 deletions test/executor/parallel_join_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,13 @@ TEST_F(ParallelJoinTests, JoinPredicateTest) {

void ExecuteJoinTest(PlanNodeType join_algorithm, PelotonJoinType join_type,
oid_t join_test_type) {
// start executor pool
ExecutorPoolHarness::GetInstance();

//===--------------------------------------------------------------------===//
// Mock table scan executors
//===--------------------------------------------------------------------===//

MockExecutor left_table_scan_executor, right_table_scan_executor;
MockExecutor left_table_scan_executor;

// Create a table and wrap it in logical tile
size_t tile_group_size = TESTS_TUPLES_PER_TILEGROUP;
Expand Down Expand Up @@ -387,41 +389,6 @@ void ExecuteJoinTest(PlanNodeType join_algorithm, PelotonJoinType join_type,
// Differ based on join algorithm
switch (join_algorithm) {

case PLAN_NODE_TYPE_MERGEJOIN: {
// Create join clauses
std::vector<planner::MergeJoinPlan::JoinClause> join_clauses;
join_clauses = JoinTestsUtil::CreateJoinClauses();

// Create merge join plan node
planner::MergeJoinPlan merge_join_node(join_type, std::move(predicate),
std::move(projection), schema,
join_clauses);

// Construct the merge join executor
executor::MergeJoinExecutor merge_join_executor(&merge_join_node,
nullptr);

// Construct the executor tree
merge_join_executor.AddChild(&left_table_scan_executor);
merge_join_executor.AddChild(&right_table_scan_executor);

// Run the merge join executor
EXPECT_TRUE(merge_join_executor.Init());
while (merge_join_executor.Execute() == true) {
std::unique_ptr<executor::LogicalTile> result_logical_tile(
merge_join_executor.GetOutput());

if (result_logical_tile != nullptr) {
result_tuple_count += result_logical_tile->GetTupleCount();
tuples_with_null += JoinTestsUtil::CountTuplesWithNullFields(
result_logical_tile.get());
JoinTestsUtil::ValidateJoinLogicalTile(result_logical_tile.get());
LOG_TRACE("%s", result_logical_tile->GetInfo().c_str());
}
}

} break;

case PLAN_NODE_TYPE_PARALLEL_HASHJOIN: {
// Create hash plan node
expression::AbstractExpression *right_table_attr_1 =
Expand Down Expand Up @@ -492,6 +459,24 @@ void ExecuteJoinTest(PlanNodeType join_algorithm, PelotonJoinType join_type,
}
}

// XXX Wait for all hash_executor tasks to finish
// This is a temporary code because we don't have hash join executor ready
while (true) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
size_t num_tuples = hash_executor->GetTotalNumTuples();
size_t expected_num_tuples =
tile_group_size * right_table_tile_group_count;
if (join_test_type == RIGHT_TABLE_EMPTY ||
join_test_type == BOTH_TABLES_EMPTY) {
expected_num_tuples = 0;
}
EXPECT_TRUE(expected_num_tuples >= num_tuples);
// All tuples have been processed
if (expected_num_tuples == num_tuples) {
break;
}
}

// Construct the executor tree
hash_join_executor.AddChild(&left_table_scan_executor);
PL_ASSERT(hash_executor != nullptr);
Expand Down

0 comments on commit b1d65e1

Please sign in to comment.