Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ExtendedCpuGpuSplit pattern, and more #1

Merged
merged 10 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion includes/Milhoja_MoverUnpacker.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

#include <pthread.h>

#include "Milhoja_DataItem.h"
#include "Milhoja_TileWrapper.h"
#include "Milhoja_RuntimeElement.h"

Expand All @@ -63,7 +64,6 @@ class MoverUnpacker : public RuntimeElement {
MoverUnpacker& operator=(const MoverUnpacker&) = delete;
MoverUnpacker& operator=(MoverUnpacker&&) = delete;

void setReceiverProto(TileWrapper const *);
void startCycle(void);
void increaseThreadCount(const unsigned int nThreads) override;
void enqueue(std::shared_ptr<DataItem>&& dataItem) override;
Expand All @@ -72,6 +72,7 @@ class MoverUnpacker : public RuntimeElement {
void wait(void);

RuntimeElement* dataReceiver(void) const { return dataReceiver_; }
const DataItem* receiverPrototype(void) const { return receiverPrototype_; }

private:
enum class State {Idle, Open, Closed};
Expand Down
22 changes: 21 additions & 1 deletion includes/Milhoja_Runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,30 @@ class Runtime {
void executeExtendedCpuGpuSplitTasks(const std::string& bundleName,
const unsigned int nDistributorThreads,
const RuntimeAction& actionA_cpu,
const TileWrapper& tilePrototype,
const RuntimeAction& actionA_gpu,
const RuntimeAction& postActionB_cpu,
const DataPacket& packetPrototype,
const RuntimeAction& postActionB_cpu,
const TileWrapper& postTilePrototype,
const unsigned int nTilesPerCpuTurn);
# ifndef RUNTIME_MUST_USE_TILEITER
void setupPipelineForExtCpuGpuSplitTasks(const std::string& bundleName,
const RuntimeAction& actionA_cpu,
const TileWrapper& tilePrototype,
const RuntimeAction& actionA_gpu,
const DataPacket& packetPrototype,
const RuntimeAction& postActionB_cpu,
const TileWrapper& postTilePrototype,
const unsigned int nTilesPerCpuTurn);
void pushTileToExtCpuGpuSplitPipeline(const std::string& bundleName,
const TileWrapper& tilePrototype,
const DataPacket& packetPrototype,
const TileWrapper& postTilePrototype,
const FlashxrTileRawPtrs& tP,
const FlashxTileRawInts& tI,
const FlashxTileRawReals& tR);
void teardownPipelineForExtCpuGpuSplitTasks(const std::string& bundleName);
# endif
void executeCpuGpuWowzaTasks(const std::string& bundleName,
const RuntimeAction& actionA_cpu,
const TileWrapper& tilePrototype,
Expand Down
3 changes: 3 additions & 0 deletions includes/Milhoja_RuntimeElement.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class RuntimeElement {
virtual std::string attachDataReceiver(RuntimeElement* receiver);
virtual std::string detachDataReceiver(void);

virtual std::string setReceiverPrototype(const DataItem* prototype);

protected:
RuntimeElement(void);
virtual ~RuntimeElement(void);
Expand All @@ -58,6 +60,7 @@ class RuntimeElement {
to once this team's action has
already been applied to the
items. */
const DataItem* receiverPrototype_;

std::map<const RuntimeElement*,bool> calledCloseQueue_; /*!< The keys in this map serve as a list
of data publishers attached to the object.
Expand Down
2 changes: 2 additions & 0 deletions includes/Milhoja_ThreadTeam.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@

#include <pthread.h>

#include "Milhoja_TileWrapper.h"
#include "Milhoja_actionRoutine.h"
#include "Milhoja_RuntimeAction.h"
#include "Milhoja_ThreadTeamMode.h"
Expand Down Expand Up @@ -113,6 +114,7 @@ class ThreadTeam : public RuntimeElement {
// into thread team configurations.
std::string attachDataReceiver(RuntimeElement* receiver) override;
std::string detachDataReceiver(void) override;
std::string setReceiverPrototype(const DataItem* prototype) override;

protected:
constexpr static unsigned int THREAD_START_STOP_TIMEOUT_SEC = 1;
Expand Down
208 changes: 208 additions & 0 deletions interfaces/Milhoja_runtime_C_interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,77 @@ extern "C" {
return MILHOJA_SUCCESS;
}

int milhoja_runtime_setup_pipeline_extcpugpusplit_c(milhoja::ACTION_ROUTINE cpuTaskFunction,
milhoja::ACTION_ROUTINE gpuTaskFunction,
milhoja::ACTION_ROUTINE postTaskFunction,
const int nThreads,
const int nTilesPerPacket,
const int nTilesPerCpuTurn,
void* packet,
void* tileWrapper,
void* postTileWrapper) {
if (nThreads < 0) {
std::cerr
<< "[milhoja_runtime_setup_pipeline_extcpugpusplit_c] nThreads is negative"
<< std::endl;
return MILHOJA_ERROR_N_THREADS_NEGATIVE;
} else if (nTilesPerPacket < 0) {
std::cerr
<< "[milhoja_runtime_setup_pipeline_extcpugpusplit_c] nTilesPerPacket is negative"
<< std::endl;
return MILHOJA_ERROR_N_TILES_NEGATIVE;
}

unsigned int nDistributorThreads_ui = 0;
unsigned int nThreads_ui = static_cast<unsigned int>(nThreads);
unsigned int nTilesPerPacket_ui = static_cast<unsigned int>(nTilesPerPacket);
unsigned int nTilesPerCpuTurn_ui = static_cast<unsigned int>(nTilesPerCpuTurn);

milhoja::TileWrapper* tilePrototype = static_cast<milhoja::TileWrapper*>(tileWrapper);
milhoja::TileWrapper* postTilePrototype = static_cast<milhoja::TileWrapper*>(postTileWrapper);
milhoja::DataPacket* pktPrototype = static_cast<milhoja::DataPacket*>(packet);

milhoja::RuntimeAction pktAction;
pktAction.name = "Lazy GPU setup Action Name";
pktAction.nInitialThreads = nThreads_ui;
pktAction.teamType = milhoja::ThreadTeamDataType::SET_OF_BLOCKS;
pktAction.nTilesPerPacket = nTilesPerPacket_ui;
pktAction.routine = gpuTaskFunction;

milhoja::RuntimeAction cpuAction;
cpuAction.name = "Lazy CPU setup Action Name";
cpuAction.nInitialThreads = nThreads_ui;
cpuAction.teamType = milhoja::ThreadTeamDataType::BLOCK;
cpuAction.nTilesPerPacket = 0;
cpuAction.routine = cpuTaskFunction;

milhoja::RuntimeAction postAction;
postAction.name = "Lazy post CPU setup Action Name";
postAction.nInitialThreads = nThreads_ui;
postAction.teamType = milhoja::ThreadTeamDataType::BLOCK;
postAction.nTilesPerPacket = 0;
postAction.routine = postTaskFunction;

try {
milhoja::Runtime::instance().setupPipelineForExtCpuGpuSplitTasks("EXT CPUGPU Split Bundle Name",
cpuAction,
*tilePrototype,
pktAction,
*pktPrototype,
postAction,
*postTilePrototype,
nTilesPerCpuTurn_ui);
} catch (const std::exception& exc) {
std::cerr << exc.what() << std::endl;
return MILHOJA_ERROR_UNABLE_TO_SETUP_PIPELINE;
} catch (...) {
std::cerr << "[milhoja_runtime_setup_pipeline_extcpugpusplit_c] Unknown error caught" << std::endl;
return MILHOJA_ERROR_UNABLE_TO_SETUP_PIPELINE;
}

return MILHOJA_SUCCESS;
}

int milhoja_runtime_teardown_pipeline_gpu_c(const int nThreads,
const int nTilesPerPacket) {
if (nThreads < 0) { // nThreads: only use in this function
Expand Down Expand Up @@ -679,6 +750,34 @@ extern "C" {

return MILHOJA_SUCCESS;
}

int milhoja_runtime_teardown_pipeline_extcpugpusplit_c(const int nThreads,
const int nTilesPerPacket) {
if (nThreads < 0) { // nThreads: only use in this function
std::cerr
<< "[milhoja_runtime_teardown_pipeline_extcpugpusplit_c] nThreads is negative"
<< std::endl;
return MILHOJA_ERROR_N_THREADS_NEGATIVE;
} else if (nTilesPerPacket < 0) { // nTilesPerPacket: only use here
std::cerr
<< "[milhoja_runtime_teardown_pipeline_extcpugpusplit_c] nTilesPerPacket is negative"
<< std::endl;
return MILHOJA_ERROR_N_TILES_NEGATIVE;
}

try {
milhoja::Runtime::instance().teardownPipelineForExtCpuGpuSplitTasks(
"Lazy EXT CPUGPU Split setup Bundle Name");
} catch (const std::exception& exc) {
std::cerr << exc.what() << std::endl;
return MILHOJA_ERROR_UNABLE_TO_TEARDOWN_PIPELINE;
} catch (...) {
std::cerr << "[milhoja_runtime_teardown_pipeline_extcpugpusplit_c] Unknown error caught" << std::endl;
return MILHOJA_ERROR_UNABLE_TO_TEARDOWN_PIPELINE;
}

return MILHOJA_SUCCESS;
}
/**
* Push one tile to the prepared pipeline so that the thread team will
* eventually execute the task.
Expand Down Expand Up @@ -797,6 +896,37 @@ extern "C" {

return MILHOJA_SUCCESS;
}

int milhoja_runtime_push_pipeline_extcpugpusplit_c(void* tileWrapper,
void* packet,
void* postTileWrapper,
const int nThreads,
FlashxTileRaw* tileInfo) {
if (nThreads < 0) {
std::cerr << "[milhoja_runtime_push_pipeline_extcpugpusplit_c] nThreads is negative" << std::endl;
return MILHOJA_ERROR_N_THREADS_NEGATIVE;
}

milhoja::TileWrapper* tilePrototype = static_cast<milhoja::TileWrapper*>(tileWrapper);
milhoja::TileWrapper* postTilePrototype = static_cast<milhoja::TileWrapper*>(postTileWrapper);
milhoja::DataPacket* pktPrototype = static_cast<milhoja::DataPacket*>(packet);

try {
milhoja::Runtime::instance().pushTileToExtCpuGpuSplitPipeline("Lazy Bundle Name",
*tilePrototype,
*pktPrototype,
*postTilePrototype,
tileInfo->sP, tileInfo->sI, tileInfo->sR);
} catch (const std::exception& exc) {
std::cerr << exc.what() << std::endl;
return MILHOJA_ERROR_UNABLE_TO_EXECUTE_TASKS;
} catch (...) {
std::cerr << "[milhoja_runtime_push_pipeline_extcpugpusplit_c] Unknown error caught" << std::endl;
return MILHOJA_ERROR_UNABLE_TO_EXECUTE_TASKS;
}

return MILHOJA_SUCCESS;
}
# endif

# ifdef RUNTIME_CAN_USE_TILEITER
Expand Down Expand Up @@ -1141,6 +1271,84 @@ extern "C" {

return MILHOJA_SUCCESS;
}

int milhoja_runtime_execute_tasks_extcpugpusplit_c(milhoja::ACTION_ROUTINE cpuTaskFunction,
milhoja::ACTION_ROUTINE gpuTaskFunction,
milhoja::ACTION_ROUTINE postTaskFunction,
const int nDistributorThreads,
const int nThreads,
const int nTilesPerPacket,
const int nTilesPerCpuTurn,
void* packet,
void* tileWrapper,
void* postTileWrapper) {
if (nDistributorThreads < 0) {
std::cerr
<< "[milhoja_runtime_execute_tasks_extcpugpusplit_c] nDistributorThreads is negative"
<< std::endl;
return MILHOJA_ERROR_N_THREADS_NEGATIVE;
} else if (nThreads < 0) {
std::cerr
<< "[milhoja_runtime_execute_tasks_extcpugpusplit_c] nThreads is negative"
<< std::endl;
return MILHOJA_ERROR_N_THREADS_NEGATIVE;
} else if (nTilesPerPacket < 0) {
std::cerr
<< "[milhoja_runtime_execute_tasks_extcpugpusplit_c] nTilesPerPacket is negative"
<< std::endl;
return MILHOJA_ERROR_N_TILES_NEGATIVE;
}

unsigned int nDistributorThreads_ui = static_cast<unsigned int>(nDistributorThreads);
unsigned int nThreads_ui = static_cast<unsigned int>(nThreads);
unsigned int nTilesPerPacket_ui = static_cast<unsigned int>(nTilesPerPacket);
unsigned int nTilesPerCpuTurn_ui = static_cast<unsigned int>(nTilesPerCpuTurn);

milhoja::TileWrapper* tilePrototype = static_cast<milhoja::TileWrapper*>(tileWrapper);
milhoja::TileWrapper* postTilePrototype = static_cast<milhoja::TileWrapper*>(postTileWrapper);
milhoja::DataPacket* pktPrototype = static_cast<milhoja::DataPacket*>(packet);

milhoja::RuntimeAction pktAction;
pktAction.name = "Lazy GPU Action Name";
pktAction.nInitialThreads = nThreads_ui;
pktAction.teamType = milhoja::ThreadTeamDataType::SET_OF_BLOCKS;
pktAction.nTilesPerPacket = nTilesPerPacket_ui;
pktAction.routine = gpuTaskFunction;

milhoja::RuntimeAction cpuAction;
cpuAction.name = "Lazy CPU Action Name";
cpuAction.nInitialThreads = nThreads_ui;
cpuAction.teamType = milhoja::ThreadTeamDataType::BLOCK;
cpuAction.nTilesPerPacket = 0;
cpuAction.routine = cpuTaskFunction;

milhoja::RuntimeAction postAction;
postAction.name = "Lazy CPU Action Name";
postAction.nInitialThreads = nThreads_ui;
postAction.teamType = milhoja::ThreadTeamDataType::BLOCK;
postAction.nTilesPerPacket = 0;
postAction.routine = postTaskFunction;

try {
milhoja::Runtime::instance().executeExtendedCpuGpuSplitTasks("Lazy GPU Bundle Name",
nDistributorThreads_ui,
cpuAction,
*tilePrototype,
pktAction,
*pktPrototype,
postAction,
*postTilePrototype,
nTilesPerCpuTurn_ui);
} catch (const std::exception& exc) {
std::cerr << exc.what() << std::endl;
return MILHOJA_ERROR_UNABLE_TO_EXECUTE_TASKS;
} catch (...) {
std::cerr << "[milhoja_runtime_execute_tasks_extcpugpusplit_c] Unknown error caught" << std::endl;
return MILHOJA_ERROR_UNABLE_TO_EXECUTE_TASKS;
}

return MILHOJA_SUCCESS;
}
# endif
#endif // #ifdef RUNTIME_SUPPORT_DATAPACKETS
}
Expand Down
Loading