Skip to content

Commit

Permalink
[cmu-db#3] Use callback to rechunk results and launch hash executor
Browse files Browse the repository at this point in the history
  • Loading branch information
eric-haibin-lin committed Dec 2, 2016
1 parent cc4b964 commit 9e0c41d
Show file tree
Hide file tree
Showing 12 changed files with 413 additions and 133 deletions.
24 changes: 18 additions & 6 deletions src/executor/parallel_hash_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,17 @@ ParallelHashExecutor::ParallelHashExecutor(const planner::AbstractPlan *node,
* @return true on success, false otherwise.
*/
bool ParallelHashExecutor::DInit() {
PL_ASSERT(children_.size() == 1);
// Initialize executor state
result_itr = 0;
if (initialized_ == false) {
// TODO Add reference node to the seq scan executors
// PL_ASSERT(children_.size() == 1);

// Initialize the hash keys
InitHashKeys();
// Initialize executor state
result_itr = 0;

// Initialize the hash keys
InitHashKeys();
}
initialized_ = true;
return true;
}

Expand All @@ -65,7 +70,10 @@ void ParallelHashExecutor::InitHashKeys() {
}
}

void ParallelHashExecutor::ExecuteTask(std::shared_ptr<HashTask> hash_task) {
void ParallelHashExecutor::ExecuteTask(std::shared_ptr<AbstractTask> task) {
PL_ASSERT(task->GetTaskType() == TASK_HASH);
executor::HashTask *hash_task = static_cast<executor::HashTask *>(task.get());

// Construct the hash table by going over each child logical tile and hashing
auto task_id = hash_task->task_id;
auto child_tiles = hash_task->result_tile_lists;
Expand Down Expand Up @@ -109,6 +117,10 @@ void ParallelHashExecutor::ExecuteTask(std::shared_ptr<HashTask> hash_task) {
bool ParallelHashExecutor::DExecute() {
LOG_TRACE("Hash Executor");

if (child_tiles_->size() == 0) {
return false;
}

// Return logical tiles one at a time
while (result_itr < (*child_tiles_)[0].size()) {
if ((*child_tiles_)[0][result_itr]->GetTupleCount() == 0) {
Expand Down
61 changes: 0 additions & 61 deletions src/executor/plan_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,67 +36,6 @@ executor::AbstractExecutor *BuildExecutorTree(

void CleanExecutorTree(executor::AbstractExecutor *root);

void HashCallback::TaskComplete(
UNUSED_ATTRIBUTE std::shared_ptr<executor::AbstractTask> task) {
// Increment the number of tasks completed
int task_num = tasks_complete_.fetch_add(1);
// This is the last task
if (task_num == total_tasks_ - 1) {
// Get the total number of partition
size_t num_partitions = PL_NUM_PARTITIONS();

// Group the results based on partitions
executor::LogicalTileLists partitioned_result_tile_lists(num_partitions);
for (auto &result_tile_list : *(task->result_tile_lists)) {
for (auto &result_tile : result_tile_list) {
size_t partition = result_tile->GetPartition();
partitioned_result_tile_lists[partition]
.push_back(std::move(result_tile));
}
}
// Populate tasks for each partition and re-chunk the tiles
std::shared_ptr<executor::LogicalTileLists> result_tile_lists(
new executor::LogicalTileLists());
for (size_t partition = 0; partition < num_partitions; partition++) {
executor::LogicalTileList next_result_tile_list;

for (auto &result_tile_list : partitioned_result_tile_lists) {
// TODO we should re-chunk based on number of tuples?
for (auto &result_tile : result_tile_list) {
next_result_tile_list.push_back(std::move(result_tile));
// Reached the limit of each chunk
if (next_result_tile_list.size() >= TASK_TILEGROUP_COUNT) {
result_tile_lists->push_back(std::move(next_result_tile_list));
next_result_tile_list = executor::LogicalTileList();
}
}
// Check the remaining result tiles
if (next_result_tile_list.size() > 0) {
result_tile_lists->push_back(std::move(next_result_tile_list));
}
}
}

size_t num_tasks = result_tile_lists->size();
// A list of all tasks to execute
std::vector<std::shared_ptr<executor::AbstractTask>> tasks;
for (size_t task_id = 0; task_id < num_tasks; task_id++) {
// Construct a hash task
std::shared_ptr<executor::AbstractTask> next_task(new executor::HashTask(
hash_executor_->GetRawNode(), hash_executor_, task_id,
result_tile_lists->at(task_id).at(0)->GetPartition(),
result_tile_lists));
// next_task->Init(next_callback, num_tasks);
tasks.push_back(next_task);
}

// TODO Launch the new tasks?
// for (auto &task : tasks) {

//}
}
}

/**
* @brief Build a executor tree and execute it.
* Use std::vector<common::Value> as params to make it more elegant for
Expand Down
17 changes: 7 additions & 10 deletions src/include/executor/abstract_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,8 @@

namespace peloton {

namespace bridge {
class Notifiable;
}

namespace planner {
class Notifiable;
class AbstractPlan;
}

Expand Down Expand Up @@ -66,7 +63,7 @@ class AbstractTask {
: node(node), result_tile_lists(result_tile_lists) {}

// Initialize the task with callbacks
inline void Init(bridge::Notifiable *callback, int num_tasks) {
inline void Init(planner::Notifiable *callback, int num_tasks) {
this->callback = callback;
if (result_tile_lists != nullptr) {
result_tile_lists->resize(num_tasks);
Expand All @@ -83,7 +80,7 @@ class AbstractTask {
std::shared_ptr<LogicalTileLists> result_tile_lists;

// The callback to call after task completes
bridge::Notifiable *callback = nullptr;
planner::Notifiable *callback = nullptr;

// Whether the task is initialized
bool initialized = false;
Expand All @@ -101,7 +98,7 @@ class PartitionAwareTask : public AbstractTask {
task_id(task_id),
partition_id(partition_id) {}

inline LogicalTileList& GetResultTileList() {
inline LogicalTileList &GetResultTileList() {
return (*result_tile_lists)[task_id];
}

Expand Down Expand Up @@ -159,14 +156,14 @@ class HashTask : public PartitionAwareTask {
* @param bulk_insert_count: The total bulk insert count in insert plan node
*/
explicit HashTask(const planner::AbstractPlan *node,
ParallelHashExecutor *hash_executor, size_t task_id,
size_t partition_id,
std::shared_ptr<ParallelHashExecutor> hash_executor,
size_t task_id, size_t partition_id,
std::shared_ptr<LogicalTileLists> result_tile_lists)
: PartitionAwareTask(node, task_id, partition_id, result_tile_lists),
hash_executor(hash_executor) {}

// The hash executor object
ParallelHashExecutor *hash_executor;
std::shared_ptr<ParallelHashExecutor> hash_executor;
};

// The class for parallel seq scan tasks
Expand Down
4 changes: 3 additions & 1 deletion src/include/executor/parallel_hash_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class ParallelHashExecutor : public AbstractExecutor {
inline const std::vector<oid_t> &GetHashKeyIds() const { return column_ids_; }

// Execute the hash task
static void ExecuteTask(std::shared_ptr<HashTask> hash_task);
static void ExecuteTask(std::shared_ptr<AbstractTask> hash_task);

// TODO This is a hack. Remove me when we hook up hash executor with seq scan
// executor
Expand All @@ -83,6 +83,8 @@ class ParallelHashExecutor : public AbstractExecutor {
std::vector<oid_t> column_ids_;

size_t result_itr = 0;

bool initialized_ = false;
};

} /* namespace executor */
Expand Down
35 changes: 2 additions & 33 deletions src/include/executor/plan_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include "common/statement.h"
#include "common/types.h"
#include "planner/abstract_callback.h"
#include "concurrency/transaction_manager.h"
#include "executor/abstract_executor.h"
#include "executor/abstract_task.h"
Expand Down Expand Up @@ -56,16 +57,7 @@ typedef struct peloton_status {
/*
* This class can be notified when a task completes
*/
class Notifiable {
public:
virtual ~Notifiable() {}
virtual void TaskComplete(std::shared_ptr<executor::AbstractTask> task) = 0;
};

/*
* This class can be notified when a task completes
*/
class BlockingWait : public Notifiable {
class BlockingWait : public planner::Notifiable {
public:
BlockingWait(int total_tasks)
: Notifiable(),
Expand Down Expand Up @@ -103,29 +95,6 @@ class BlockingWait : public Notifiable {
std::condition_variable cv;
};

/*
* This class can be notified when a task completes
*/
class HashCallback : public Notifiable {
public:
HashCallback(int total_tasks, executor::ParallelHashExecutor *hash_executor)
: Notifiable(),
total_tasks_(total_tasks),
tasks_complete_(0),
hash_executor_(hash_executor) {}

~HashCallback() {}

// when a task completes it will call this
// Assume the task is seq scan task
void TaskComplete(std::shared_ptr<executor::AbstractTask> task) override;

private:
int total_tasks_;
std::atomic<int> tasks_complete_;
executor::ParallelHashExecutor *hash_executor_;
};

/*
* Struct to hold parameters used by the exchange operator
*/
Expand Down
62 changes: 62 additions & 0 deletions src/include/planner/abstract_callback.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//===----------------------------------------------------------------------===//
//
// Peloton
//
// abstract_plan.h
//
// Identification: src/include/planner/abstract_plan.h
//
// Copyright (c) 2015-16, Carnegie Mellon University Database Group
//
//===----------------------------------------------------------------------===//

#pragma once

#include <cstdint>
#include <iostream>
#include <map>
#include <memory>
#include <vector>
#include <vector>

#include "catalog/schema.h"
#include "common/printable.h"
#include "common/serializeio.h"
#include "common/serializer.h"
#include "common/types.h"
#include "common/value.h"

namespace peloton {

namespace executor {
class AbstractTask;
// class LogicalTile;
}
//
// namespace catalog {
// class Schema;
//}
//
// namespace expression {
// class AbstractExpression;
//}

namespace planner {

//===--------------------------------------------------------------------===//
// Abstract Callback
//===--------------------------------------------------------------------===//
/*
* This class can be notified when a task completes
* This class is used when dependency completes. Need another class for task
* completion
*/
class Notifiable {
public:
virtual ~Notifiable() {}
// TODO Rename this function to Dependency complete..
virtual void TaskComplete(std::shared_ptr<executor::AbstractTask> task) = 0;
};

} // namespace planner
} // namespace peloton
5 changes: 5 additions & 0 deletions src/include/planner/abstract_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ class AbstractPlan : public Printable {
}
virtual int SerializeSize() { return 0; }

// TODO Move me to private
void SetDependentParent(AbstractPlan *dependent_parent) {
dependent_parent_ = dependent_parent;
}

protected:
// only used by its derived classes (when deserialization)
AbstractPlan *Parent() { return parent_; }
Expand Down
69 changes: 69 additions & 0 deletions src/include/planner/parallel_hash_join_plan.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
//===----------------------------------------------------------------------===//
//
// Peloton
//
// parallel_hash_join_plan.h
//
// Identification: src/include/planner/parallel_hash_join_plan.h
//
// Copyright (c) 2015-16, Carnegie Mellon University Database Group
//
//===----------------------------------------------------------------------===//

#pragma once

#include "planner/abstract_join_plan.h"

namespace peloton {
namespace expression {
class AbstractExpression;
}
namespace planner {

class ParallelHashJoinPlan : public AbstractJoinPlan {
public:
ParallelHashJoinPlan(const ParallelHashJoinPlan &) = delete;
ParallelHashJoinPlan &operator=(const ParallelHashJoinPlan &) = delete;
ParallelHashJoinPlan(ParallelHashJoinPlan &&) = delete;
ParallelHashJoinPlan &operator=(ParallelHashJoinPlan &&) = delete;

ParallelHashJoinPlan(
PelotonJoinType join_type,
std::unique_ptr<const expression::AbstractExpression> &&predicate,
std::unique_ptr<const ProjectInfo> &&proj_info,
std::shared_ptr<const catalog::Schema> &proj_schema);

ParallelHashJoinPlan(
PelotonJoinType join_type,
std::unique_ptr<const expression::AbstractExpression> &&predicate,
std::unique_ptr<const ProjectInfo> &&proj_info,
std::shared_ptr<const catalog::Schema> &proj_schema,
const std::vector<oid_t> &outer_hashkeys);

inline PlanNodeType GetPlanNodeType() const {
return PLAN_NODE_TYPE_PARALLEL_HASHJOIN;
}

const std::string GetInfo() const { return "ParallelHashJoin"; }

const std::vector<oid_t> &GetOuterHashIds() const {
return outer_column_ids_;
}

std::unique_ptr<AbstractPlan> Copy() const {
std::unique_ptr<const expression::AbstractExpression> predicate_copy(
GetPredicate()->Copy());
std::shared_ptr<const catalog::Schema> schema_copy(
catalog::Schema::CopySchema(GetSchema()));
ParallelHashJoinPlan *new_plan = new ParallelHashJoinPlan(
GetJoinType(), std::move(predicate_copy),
std::move(GetProjInfo()->Copy()), schema_copy, outer_column_ids_);
return std::unique_ptr<AbstractPlan>(new_plan);
}

private:
std::vector<oid_t> outer_column_ids_;
};

} // namespace planner
} // namespace peloton
Loading

0 comments on commit 9e0c41d

Please sign in to comment.