Skip to content

Commit

Permalink
Merge pull request #1 from Flash-X/ipdps2025
Browse files Browse the repository at this point in the history
Support ExtendedCpuGpuSplit pattern, and more
  • Loading branch information
kweide authored Nov 27, 2024
2 parents 9ee40d5 + 513f2bd commit 262c450
Show file tree
Hide file tree
Showing 17 changed files with 2,374 additions and 270 deletions.
3 changes: 2 additions & 1 deletion includes/Milhoja_MoverUnpacker.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

#include <pthread.h>

#include "Milhoja_DataItem.h"
#include "Milhoja_TileWrapper.h"
#include "Milhoja_RuntimeElement.h"

Expand All @@ -63,7 +64,6 @@ class MoverUnpacker : public RuntimeElement {
MoverUnpacker& operator=(const MoverUnpacker&) = delete;
MoverUnpacker& operator=(MoverUnpacker&&) = delete;

void setReceiverProto(TileWrapper const *);
void startCycle(void);
void increaseThreadCount(const unsigned int nThreads) override;
void enqueue(std::shared_ptr<DataItem>&& dataItem) override;
Expand All @@ -72,6 +72,7 @@ class MoverUnpacker : public RuntimeElement {
void wait(void);

RuntimeElement* dataReceiver(void) const { return dataReceiver_; }
const DataItem* receiverPrototype(void) const { return receiverPrototype_; }

private:
enum class State {Idle, Open, Closed};
Expand Down
22 changes: 21 additions & 1 deletion includes/Milhoja_Runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,30 @@ class Runtime {
void executeExtendedCpuGpuSplitTasks(const std::string& bundleName,
const unsigned int nDistributorThreads,
const RuntimeAction& actionA_cpu,
const TileWrapper& tilePrototype,
const RuntimeAction& actionA_gpu,
const RuntimeAction& postActionB_cpu,
const DataPacket& packetPrototype,
const RuntimeAction& postActionB_cpu,
const TileWrapper& postTilePrototype,
const unsigned int nTilesPerCpuTurn);
# ifndef RUNTIME_MUST_USE_TILEITER
void setupPipelineForExtCpuGpuSplitTasks(const std::string& bundleName,
const RuntimeAction& actionA_cpu,
const TileWrapper& tilePrototype,
const RuntimeAction& actionA_gpu,
const DataPacket& packetPrototype,
const RuntimeAction& postActionB_cpu,
const TileWrapper& postTilePrototype,
const unsigned int nTilesPerCpuTurn);
void pushTileToExtCpuGpuSplitPipeline(const std::string& bundleName,
const TileWrapper& tilePrototype,
const DataPacket& packetPrototype,
const TileWrapper& postTilePrototype,
const FlashxrTileRawPtrs& tP,
const FlashxTileRawInts& tI,
const FlashxTileRawReals& tR);
void teardownPipelineForExtCpuGpuSplitTasks(const std::string& bundleName);
# endif
void executeCpuGpuWowzaTasks(const std::string& bundleName,
const RuntimeAction& actionA_cpu,
const TileWrapper& tilePrototype,
Expand Down
3 changes: 3 additions & 0 deletions includes/Milhoja_RuntimeElement.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class RuntimeElement {
virtual std::string attachDataReceiver(RuntimeElement* receiver);
virtual std::string detachDataReceiver(void);

virtual std::string setReceiverPrototype(const DataItem* prototype);

protected:
RuntimeElement(void);
virtual ~RuntimeElement(void);
Expand All @@ -58,6 +60,7 @@ class RuntimeElement {
to once this team's action has
already been applied to the
items. */
const DataItem* receiverPrototype_;

std::map<const RuntimeElement*,bool> calledCloseQueue_; /*!< The keys in this map serve as a list
of data publishers attached to the object.
Expand Down
2 changes: 2 additions & 0 deletions includes/Milhoja_ThreadTeam.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@

#include <pthread.h>

#include "Milhoja_TileWrapper.h"
#include "Milhoja_actionRoutine.h"
#include "Milhoja_RuntimeAction.h"
#include "Milhoja_ThreadTeamMode.h"
Expand Down Expand Up @@ -113,6 +114,7 @@ class ThreadTeam : public RuntimeElement {
// into thread team configurations.
std::string attachDataReceiver(RuntimeElement* receiver) override;
std::string detachDataReceiver(void) override;
std::string setReceiverPrototype(const DataItem* prototype) override;

protected:
constexpr static unsigned int THREAD_START_STOP_TIMEOUT_SEC = 1;
Expand Down
208 changes: 208 additions & 0 deletions interfaces/Milhoja_runtime_C_interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,77 @@ extern "C" {
return MILHOJA_SUCCESS;
}

int milhoja_runtime_setup_pipeline_extcpugpusplit_c(milhoja::ACTION_ROUTINE cpuTaskFunction,
milhoja::ACTION_ROUTINE gpuTaskFunction,
milhoja::ACTION_ROUTINE postTaskFunction,
const int nThreads,
const int nTilesPerPacket,
const int nTilesPerCpuTurn,
void* packet,
void* tileWrapper,
void* postTileWrapper) {
if (nThreads < 0) {
std::cerr
<< "[milhoja_runtime_setup_pipeline_extcpugpusplit_c] nThreads is negative"
<< std::endl;
return MILHOJA_ERROR_N_THREADS_NEGATIVE;
} else if (nTilesPerPacket < 0) {
std::cerr
<< "[milhoja_runtime_setup_pipeline_extcpugpusplit_c] nTilesPerPacket is negative"
<< std::endl;
return MILHOJA_ERROR_N_TILES_NEGATIVE;
}

unsigned int nDistributorThreads_ui = 0;
unsigned int nThreads_ui = static_cast<unsigned int>(nThreads);
unsigned int nTilesPerPacket_ui = static_cast<unsigned int>(nTilesPerPacket);
unsigned int nTilesPerCpuTurn_ui = static_cast<unsigned int>(nTilesPerCpuTurn);

milhoja::TileWrapper* tilePrototype = static_cast<milhoja::TileWrapper*>(tileWrapper);
milhoja::TileWrapper* postTilePrototype = static_cast<milhoja::TileWrapper*>(postTileWrapper);
milhoja::DataPacket* pktPrototype = static_cast<milhoja::DataPacket*>(packet);

milhoja::RuntimeAction pktAction;
pktAction.name = "Lazy GPU setup Action Name";
pktAction.nInitialThreads = nThreads_ui;
pktAction.teamType = milhoja::ThreadTeamDataType::SET_OF_BLOCKS;
pktAction.nTilesPerPacket = nTilesPerPacket_ui;
pktAction.routine = gpuTaskFunction;

milhoja::RuntimeAction cpuAction;
cpuAction.name = "Lazy CPU setup Action Name";
cpuAction.nInitialThreads = nThreads_ui;
cpuAction.teamType = milhoja::ThreadTeamDataType::BLOCK;
cpuAction.nTilesPerPacket = 0;
cpuAction.routine = cpuTaskFunction;

milhoja::RuntimeAction postAction;
postAction.name = "Lazy post CPU setup Action Name";
postAction.nInitialThreads = nThreads_ui;
postAction.teamType = milhoja::ThreadTeamDataType::BLOCK;
postAction.nTilesPerPacket = 0;
postAction.routine = postTaskFunction;

try {
milhoja::Runtime::instance().setupPipelineForExtCpuGpuSplitTasks("EXT CPUGPU Split Bundle Name",
cpuAction,
*tilePrototype,
pktAction,
*pktPrototype,
postAction,
*postTilePrototype,
nTilesPerCpuTurn_ui);
} catch (const std::exception& exc) {
std::cerr << exc.what() << std::endl;
return MILHOJA_ERROR_UNABLE_TO_SETUP_PIPELINE;
} catch (...) {
std::cerr << "[milhoja_runtime_setup_pipeline_extcpugpusplit_c] Unknown error caught" << std::endl;
return MILHOJA_ERROR_UNABLE_TO_SETUP_PIPELINE;
}

return MILHOJA_SUCCESS;
}

int milhoja_runtime_teardown_pipeline_gpu_c(const int nThreads,
const int nTilesPerPacket) {
if (nThreads < 0) { // nThreads: only use in this function
Expand Down Expand Up @@ -679,6 +750,34 @@ extern "C" {

return MILHOJA_SUCCESS;
}

int milhoja_runtime_teardown_pipeline_extcpugpusplit_c(const int nThreads,
const int nTilesPerPacket) {
if (nThreads < 0) { // nThreads: only use in this function
std::cerr
<< "[milhoja_runtime_teardown_pipeline_extcpugpusplit_c] nThreads is negative"
<< std::endl;
return MILHOJA_ERROR_N_THREADS_NEGATIVE;
} else if (nTilesPerPacket < 0) { // nTilesPerPacket: only use here
std::cerr
<< "[milhoja_runtime_teardown_pipeline_extcpugpusplit_c] nTilesPerPacket is negative"
<< std::endl;
return MILHOJA_ERROR_N_TILES_NEGATIVE;
}

try {
milhoja::Runtime::instance().teardownPipelineForExtCpuGpuSplitTasks(
"Lazy EXT CPUGPU Split setup Bundle Name");
} catch (const std::exception& exc) {
std::cerr << exc.what() << std::endl;
return MILHOJA_ERROR_UNABLE_TO_TEARDOWN_PIPELINE;
} catch (...) {
std::cerr << "[milhoja_runtime_teardown_pipeline_extcpugpusplit_c] Unknown error caught" << std::endl;
return MILHOJA_ERROR_UNABLE_TO_TEARDOWN_PIPELINE;
}

return MILHOJA_SUCCESS;
}
/**
* Push one tile to the prepared pipeline so that the thread team will
* eventually execute the task.
Expand Down Expand Up @@ -797,6 +896,37 @@ extern "C" {

return MILHOJA_SUCCESS;
}

int milhoja_runtime_push_pipeline_extcpugpusplit_c(void* tileWrapper,
void* packet,
void* postTileWrapper,
const int nThreads,
FlashxTileRaw* tileInfo) {
if (nThreads < 0) {
std::cerr << "[milhoja_runtime_push_pipeline_extcpugpusplit_c] nThreads is negative" << std::endl;
return MILHOJA_ERROR_N_THREADS_NEGATIVE;
}

milhoja::TileWrapper* tilePrototype = static_cast<milhoja::TileWrapper*>(tileWrapper);
milhoja::TileWrapper* postTilePrototype = static_cast<milhoja::TileWrapper*>(postTileWrapper);
milhoja::DataPacket* pktPrototype = static_cast<milhoja::DataPacket*>(packet);

try {
milhoja::Runtime::instance().pushTileToExtCpuGpuSplitPipeline("Lazy Bundle Name",
*tilePrototype,
*pktPrototype,
*postTilePrototype,
tileInfo->sP, tileInfo->sI, tileInfo->sR);
} catch (const std::exception& exc) {
std::cerr << exc.what() << std::endl;
return MILHOJA_ERROR_UNABLE_TO_EXECUTE_TASKS;
} catch (...) {
std::cerr << "[milhoja_runtime_push_pipeline_extcpugpusplit_c] Unknown error caught" << std::endl;
return MILHOJA_ERROR_UNABLE_TO_EXECUTE_TASKS;
}

return MILHOJA_SUCCESS;
}
# endif

# ifdef RUNTIME_CAN_USE_TILEITER
Expand Down Expand Up @@ -1141,6 +1271,84 @@ extern "C" {

return MILHOJA_SUCCESS;
}

int milhoja_runtime_execute_tasks_extcpugpusplit_c(milhoja::ACTION_ROUTINE cpuTaskFunction,
milhoja::ACTION_ROUTINE gpuTaskFunction,
milhoja::ACTION_ROUTINE postTaskFunction,
const int nDistributorThreads,
const int nThreads,
const int nTilesPerPacket,
const int nTilesPerCpuTurn,
void* packet,
void* tileWrapper,
void* postTileWrapper) {
if (nDistributorThreads < 0) {
std::cerr
<< "[milhoja_runtime_execute_tasks_extcpugpusplit_c] nDistributorThreads is negative"
<< std::endl;
return MILHOJA_ERROR_N_THREADS_NEGATIVE;
} else if (nThreads < 0) {
std::cerr
<< "[milhoja_runtime_execute_tasks_extcpugpusplit_c] nThreads is negative"
<< std::endl;
return MILHOJA_ERROR_N_THREADS_NEGATIVE;
} else if (nTilesPerPacket < 0) {
std::cerr
<< "[milhoja_runtime_execute_tasks_extcpugpusplit_c] nTilesPerPacket is negative"
<< std::endl;
return MILHOJA_ERROR_N_TILES_NEGATIVE;
}

unsigned int nDistributorThreads_ui = static_cast<unsigned int>(nDistributorThreads);
unsigned int nThreads_ui = static_cast<unsigned int>(nThreads);
unsigned int nTilesPerPacket_ui = static_cast<unsigned int>(nTilesPerPacket);
unsigned int nTilesPerCpuTurn_ui = static_cast<unsigned int>(nTilesPerCpuTurn);

milhoja::TileWrapper* tilePrototype = static_cast<milhoja::TileWrapper*>(tileWrapper);
milhoja::TileWrapper* postTilePrototype = static_cast<milhoja::TileWrapper*>(postTileWrapper);
milhoja::DataPacket* pktPrototype = static_cast<milhoja::DataPacket*>(packet);

milhoja::RuntimeAction pktAction;
pktAction.name = "Lazy GPU Action Name";
pktAction.nInitialThreads = nThreads_ui;
pktAction.teamType = milhoja::ThreadTeamDataType::SET_OF_BLOCKS;
pktAction.nTilesPerPacket = nTilesPerPacket_ui;
pktAction.routine = gpuTaskFunction;

milhoja::RuntimeAction cpuAction;
cpuAction.name = "Lazy CPU Action Name";
cpuAction.nInitialThreads = nThreads_ui;
cpuAction.teamType = milhoja::ThreadTeamDataType::BLOCK;
cpuAction.nTilesPerPacket = 0;
cpuAction.routine = cpuTaskFunction;

milhoja::RuntimeAction postAction;
postAction.name = "Lazy CPU Action Name";
postAction.nInitialThreads = nThreads_ui;
postAction.teamType = milhoja::ThreadTeamDataType::BLOCK;
postAction.nTilesPerPacket = 0;
postAction.routine = postTaskFunction;

try {
milhoja::Runtime::instance().executeExtendedCpuGpuSplitTasks("Lazy GPU Bundle Name",
nDistributorThreads_ui,
cpuAction,
*tilePrototype,
pktAction,
*pktPrototype,
postAction,
*postTilePrototype,
nTilesPerCpuTurn_ui);
} catch (const std::exception& exc) {
std::cerr << exc.what() << std::endl;
return MILHOJA_ERROR_UNABLE_TO_EXECUTE_TASKS;
} catch (...) {
std::cerr << "[milhoja_runtime_execute_tasks_extcpugpusplit_c] Unknown error caught" << std::endl;
return MILHOJA_ERROR_UNABLE_TO_EXECUTE_TASKS;
}

return MILHOJA_SUCCESS;
}
# endif
#endif // #ifdef RUNTIME_SUPPORT_DATAPACKETS
}
Expand Down
Loading

0 comments on commit 262c450

Please sign in to comment.