Skip to content

Commit

Permalink
Bug fix for parallel_join_test [cmu-db#3]
Browse files Browse the repository at this point in the history
  • Loading branch information
eric-haibin-lin committed Dec 2, 2016
1 parent 0780e24 commit a030e50
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 62 deletions.
19 changes: 12 additions & 7 deletions src/planner/parallel_hash_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,20 @@ ParallelHashPlan::DependencyCompleteHelper(
tasks.push_back(next_task);
}

for (auto task : tasks) {
executor::HashTask *hash_task =
static_cast<executor::HashTask *>(task.get());
partitioned_executor_thread_pool.SubmitTask(
hash_task->partition_id, executor::ParallelHashExecutor::ExecuteTask,
std::move(task));
if (num_tasks == 0) {
// No task to do. Immediately notify dependent
task->dependent->parent_dependent->DependencyComplete(task);
} else {
for (auto task : tasks) {
executor::HashTask *hash_task =
static_cast<executor::HashTask *>(task.get());
partitioned_executor_thread_pool.SubmitTask(
hash_task->partition_id, executor::ParallelHashExecutor::ExecuteTask,
std::move(task));
}
}

LOG_DEBUG("%d hash tasks submitted", (int)tasks.size());
LOG_DEBUG("%d hash tasks submitted", (int)num_tasks);

// XXX This is a hack to let join test pass
hash_executor->SetChildTiles(result_tile_lists);
Expand Down
46 changes: 2 additions & 44 deletions test/executor/parallel_hash_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ TEST_F(ParallelHashTests, BasicTest) {
std::unique_ptr<executor::ExecutorContext> context(
new executor::ExecutorContext(txn));

// TODO Want to pass the context from one task to another

// Create seq scan executor
// We probably don't need this executor instantiated
std::shared_ptr<executor::ParallelSeqScanExecutor> seq_scan_executor(
Expand Down Expand Up @@ -131,48 +129,8 @@ TEST_F(ParallelHashTests, BasicTest) {
wait->WaitForCompletion();
txn_manager.CommitTransaction(txn);

// // Create hash executor
// std::shared_ptr<executor::ParallelHashExecutor> hash_executor(
// new executor::ParallelHashExecutor(hash_plan_node.get(), nullptr));

// std::vector<std::shared_ptr<executor::AbstractTask>> seq_scan_tasks;
// for (size_t task_id = 0; task_id < num_seq_scan_tasks; task_id++) {
//
// Create seq scan tasks
// std::shared_ptr<executor::AbstractTask> task(new executor::SeqScanTask(
// hash_plan_node.get(), INVALID_TASK_ID, INVALID_PARTITION_ID,
// table_logical_tile_lists));

// Init task with num tasks
// task->Init(seq_scan_executor.get(), hash_plan_node.get(),
// num_seq_scan_tasks);

// Insert to the list
// seq_scan_tasks.push_back(task);
// }

// seq_scan_executor->SetNumTasks(num_seq_scan_tasks);

// TODO Execute seq scan in parallel, at the end of seq scan task, invoke
// callback on parent node (hash planner node)

// Generate a seq scan node with parent points to hash plan node

// TODO Generate a list of tasks

// Submit task to thread pool, which generates the executor.

// child_executor = new executor::SeqScanExecutor(plan, executor_context);

// Loop until the last seq scan task completes
// for (size_t task_id = 0; task_id < num_seq_scan_tasks; task_id++) {
// auto task = seq_scan_tasks[task_id];
// if (task->trackable->TaskComplete()) {
// PL_ASSERT(task_id == num_seq_scan_tasks - 1);
// hash_executor = hash_plan_node->DependencyCompleteHelper(task, false);
// }
// }

// TODO Because we don't have the reference to the hash executor, so we cannot
// validate the number of tuples in the hash table now..
// while (true) {
// std::this_thread::sleep_for(std::chrono::milliseconds(10));
// size_t num_tuples = hash_executor->GetTotalNumTuples();
Expand Down
22 changes: 11 additions & 11 deletions test/executor/parallel_join_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "executor/parallel_seq_scan_executor.h"
#include "executor/merge_join_executor.h"
#include "executor/nested_loop_join_executor.h"
#include "executor/plan_executor.h"

#include "expression/abstract_expression.h"
#include "expression/tuple_value_expression.h"
Expand Down Expand Up @@ -412,7 +413,7 @@ void ExecuteJoinTest(PlanNodeType join_algorithm, PelotonJoinType join_type,
new expression::TupleValueExpression(common::Type::INTEGER, 1,
1)});

// Create executor context
// Create executor context with empty txn
std::shared_ptr<executor::ExecutorContext> context(
new executor::ExecutorContext(nullptr));

Expand All @@ -425,8 +426,12 @@ void ExecuteJoinTest(PlanNodeType join_algorithm, PelotonJoinType join_type,
new planner::ParallelHashJoinPlan(join_type, std::move(predicate),
std::move(projection), schema));

// Create a blocking wait at the top of hash executor because the hash
// join executor is not ready yet..
std::unique_ptr<bridge::BlockingWait> wait(new bridge::BlockingWait(1));

// Set the dependent of hash plan MANUALLY
hash_plan_node->parent_dependent = hash_join_plan_node.get();
hash_plan_node->parent_dependent = wait.get();

// Create seq scan executor
std::shared_ptr<executor::ParallelSeqScanExecutor> seq_scan_executor(
Expand Down Expand Up @@ -462,22 +467,17 @@ void ExecuteJoinTest(PlanNodeType join_algorithm, PelotonJoinType join_type,
}
}

// XXX Wait for all hash_executor tasks to finish
// This is a temporary code because we don't have hash join executor ready
while (true) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
wait->WaitForCompletion();
// Valid the number of hash tuples
{
size_t num_tuples = hash_executor->GetTotalNumTuples();
size_t expected_num_tuples =
tile_group_size * right_table_tile_group_count;
if (join_test_type == RIGHT_TABLE_EMPTY ||
join_test_type == BOTH_TABLES_EMPTY) {
expected_num_tuples = 0;
}
EXPECT_TRUE(expected_num_tuples >= num_tuples);
// All tuples have been processed
if (expected_num_tuples == num_tuples) {
break;
}
EXPECT_TRUE(expected_num_tuples == num_tuples);
}

// Construct the executor tree
Expand Down

0 comments on commit a030e50

Please sign in to comment.