From b1d65e1196e1c129c47e68b43a1296c94854e937 Mon Sep 17 00:00:00 2001 From: eric-haibin-lin Date: Mon, 28 Nov 2016 15:28:27 -0500 Subject: [PATCH] Reserve space for hash table [#3] --- src/include/executor/parallel_hash_executor.h | 2 + src/planner/parallel_hash_plan.cpp | 19 +++--- test/executor/parallel_join_test.cpp | 59 +++++++------------ 3 files changed, 35 insertions(+), 45 deletions(-) diff --git a/src/include/executor/parallel_hash_executor.h b/src/include/executor/parallel_hash_executor.h index a6b365cf408..2b0b637552f 100644 --- a/src/include/executor/parallel_hash_executor.h +++ b/src/include/executor/parallel_hash_executor.h @@ -85,6 +85,8 @@ class ParallelHashExecutor : public AbstractExecutor, public Trackable { total_num_tuples_.fetch_add(num_tuples); } + inline void Reserve(size_t num_tuples) { hash_table_.reserve(num_tuples); } + protected: // Initialize the values of the hash keys from plan node void InitHashKeys(); diff --git a/src/planner/parallel_hash_plan.cpp b/src/planner/parallel_hash_plan.cpp index d7d7a2d14bf..2194ec1e75d 100644 --- a/src/planner/parallel_hash_plan.cpp +++ b/src/planner/parallel_hash_plan.cpp @@ -40,10 +40,13 @@ ParallelHashPlan::DependencyCompleteHelper( num_partitions = 1; } + size_t total_num_tuples = 0; + // Group the results based on partitions executor::LogicalTileLists partitioned_result_tile_lists(num_partitions); for (auto &result_tile_list : *(task->result_tile_lists.get())) { for (auto &result_tile : result_tile_list) { + total_num_tuples += result_tile->GetTupleCount(); size_t partition = result_tile->GetPartition(); if (force_single_partition) { partition = 0; @@ -84,9 +87,12 @@ ParallelHashPlan::DependencyCompleteHelper( std::shared_ptr hash_executor( new executor::ParallelHashExecutor(this, nullptr)); hash_executor->SetNumTasks(num_tasks); + // Reserve space for hash table + hash_executor->Reserve(total_num_tuples); hash_executor->Init(); + LOG_DEBUG("Number of tuples from child: %d", (int)total_num_tuples); - // TODO Add dummy child node to retrieve result from + // TODO Add dummy child node to retrieve result from? // hash_executor.AddChild(&right_table_scan_executor); for (size_t task_id = 0; task_id < num_tasks; task_id++) { @@ -112,15 +118,12 @@ ParallelHashPlan::DependencyCompleteHelper( for (auto task : tasks) { executor::HashTask *hash_task = static_cast(task.get()); - if (force_single_partition) { - executor::ParallelHashExecutor::ExecuteTask(task); - } else { - partitioned_executor_thread_pool.SubmitTask( - hash_task->partition_id, executor::ParallelHashExecutor::ExecuteTask, - std::move(task)); - } + partitioned_executor_thread_pool.SubmitTask( + hash_task->partition_id, executor::ParallelHashExecutor::ExecuteTask, + std::move(task)); } + LOG_DEBUG("%d hash tasks submitted", (int)tasks.size()); // XXX This is a hack to let join test pass hash_executor->SetChildTiles(result_tile_lists); return std::move(hash_executor); diff --git a/test/executor/parallel_join_test.cpp b/test/executor/parallel_join_test.cpp index 7d12bfe25ad..3de7f893539 100644 --- a/test/executor/parallel_join_test.cpp +++ b/test/executor/parallel_join_test.cpp @@ -171,11 +171,13 @@ TEST_F(ParallelJoinTests, JoinPredicateTest) { void ExecuteJoinTest(PlanNodeType join_algorithm, PelotonJoinType join_type, oid_t join_test_type) { + // start executor pool + ExecutorPoolHarness::GetInstance(); + //===--------------------------------------------------------------------===// // Mock table scan executors //===--------------------------------------------------------------------===// - - MockExecutor left_table_scan_executor, right_table_scan_executor; + MockExecutor left_table_scan_executor; // Create a table and wrap it in logical tile size_t tile_group_size = TESTS_TUPLES_PER_TILEGROUP; @@ -387,41 +389,6 @@ void ExecuteJoinTest(PlanNodeType join_algorithm, PelotonJoinType join_type, // Differ based on join algorithm switch (join_algorithm) { - case PLAN_NODE_TYPE_MERGEJOIN: { - // Create join clauses - std::vector join_clauses; - join_clauses = JoinTestsUtil::CreateJoinClauses(); - - // Create merge join plan node - planner::MergeJoinPlan merge_join_node(join_type, std::move(predicate), - std::move(projection), schema, - join_clauses); - - // Construct the merge join executor - executor::MergeJoinExecutor merge_join_executor(&merge_join_node, - nullptr); - - // Construct the executor tree - merge_join_executor.AddChild(&left_table_scan_executor); - merge_join_executor.AddChild(&right_table_scan_executor); - - // Run the merge join executor - EXPECT_TRUE(merge_join_executor.Init()); - while (merge_join_executor.Execute() == true) { - std::unique_ptr result_logical_tile( - merge_join_executor.GetOutput()); - - if (result_logical_tile != nullptr) { - result_tuple_count += result_logical_tile->GetTupleCount(); - tuples_with_null += JoinTestsUtil::CountTuplesWithNullFields( - result_logical_tile.get()); - JoinTestsUtil::ValidateJoinLogicalTile(result_logical_tile.get()); - LOG_TRACE("%s", result_logical_tile->GetInfo().c_str()); - } - } - - } break; - case PLAN_NODE_TYPE_PARALLEL_HASHJOIN: { // Create hash plan node expression::AbstractExpression *right_table_attr_1 = @@ -492,6 +459,24 @@ void ExecuteJoinTest(PlanNodeType join_algorithm, PelotonJoinType join_type, } } + // XXX Wait for all hash_executor tasks to finish + // This is a temporary code because we don't have hash join executor ready + while (true) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + size_t num_tuples = hash_executor->GetTotalNumTuples(); + size_t expected_num_tuples = + tile_group_size * right_table_tile_group_count; + if (join_test_type == RIGHT_TABLE_EMPTY || + join_test_type == BOTH_TABLES_EMPTY) { + expected_num_tuples = 0; + } + EXPECT_TRUE(expected_num_tuples >= num_tuples); + // All tuples have been processed + if (expected_num_tuples == num_tuples) { + break; + } + } + // Construct the executor tree hash_join_executor.AddChild(&left_table_scan_executor); PL_ASSERT(hash_executor != nullptr);