Skip to content

Commit

Permalink
better handling for data receiver's prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
ylee88 committed Oct 25, 2024
1 parent 5e9ec79 commit 2f9cab6
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 44 deletions.
3 changes: 2 additions & 1 deletion includes/Milhoja_MoverUnpacker.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

#include <pthread.h>

#include "Milhoja_DataItem.h"
#include "Milhoja_TileWrapper.h"
#include "Milhoja_RuntimeElement.h"

Expand All @@ -63,7 +64,6 @@ class MoverUnpacker : public RuntimeElement {
MoverUnpacker& operator=(const MoverUnpacker&) = delete;
MoverUnpacker& operator=(MoverUnpacker&&) = delete;

void setReceiverProto(TileWrapper const *);
void startCycle(void);
void increaseThreadCount(const unsigned int nThreads) override;
void enqueue(std::shared_ptr<DataItem>&& dataItem) override;
Expand All @@ -72,6 +72,7 @@ class MoverUnpacker : public RuntimeElement {
void wait(void);

RuntimeElement* dataReceiver(void) const { return dataReceiver_; }
const DataItem* receiverPrototype(void) const { return receiverPrototype_; }

private:
enum class State {Idle, Open, Closed};
Expand Down
3 changes: 3 additions & 0 deletions includes/Milhoja_RuntimeElement.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class RuntimeElement {
virtual std::string attachDataReceiver(RuntimeElement* receiver);
virtual std::string detachDataReceiver(void);

virtual std::string setReceiverPrototype(const DataItem* prototype);

protected:
RuntimeElement(void);
virtual ~RuntimeElement(void);
Expand All @@ -58,6 +60,7 @@ class RuntimeElement {
to once this team's action has
already been applied to the
items. */
const DataItem* receiverPrototype_;

std::map<const RuntimeElement*,bool> calledCloseQueue_; /*!< The keys in this map serve as a list
of data publishers attached to the object.
Expand Down
3 changes: 1 addition & 2 deletions includes/Milhoja_ThreadTeam.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class ThreadTeam : public RuntimeElement {
// into thread team configurations.
std::string attachDataReceiver(RuntimeElement* receiver) override;
std::string detachDataReceiver(void) override;
void setReceiverProto(TileWrapper const * w);
std::string setReceiverPrototype(const DataItem* prototype) override;

protected:
constexpr static unsigned int THREAD_START_STOP_TIMEOUT_SEC = 1;
Expand Down Expand Up @@ -215,7 +215,6 @@ class ThreadTeam : public RuntimeElement {

// Keep track of when wait() is blocking and when it is released
bool isWaitBlocking_; //!< Only a single thread can be blocked
const TileWrapper * receiverProto_;
};

}
Expand Down
31 changes: 10 additions & 21 deletions src/Milhoja_MoverUnpacker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,18 +193,17 @@ void MoverUnpacker::handleTransferFinished(void* userData) {

// Transfer the ownership of the data items in the packet to the next team
if (dataReceiver) {
while (packet->nTiles() > 0) {
#if(0)
std::shared_ptr<Tile> curTile = std::move(packet->popTile());
std::shared_ptr<TileWrapper> wrappedTile =
unpacker->tileProto_->clone( std::move(curTile) );
dataReceiver->enqueue( std::move(wrappedTile) );
#endif
dataReceiver->enqueue(
unpacker->tileProto_->clone(packet->popTile())
);
auto receiverPrototype = unpacker->receiverPrototype();
if (receiverPrototype) {
const TileWrapper* tileWrapperPrototype =
dynamic_cast<const TileWrapper*>(receiverPrototype);
while (packet->nTiles() > 0) {
dataReceiver->enqueue(
tileWrapperPrototype->clone(packet->popTile())
);
}
dataReceiver = nullptr;
}
dataReceiver = nullptr;
}
packet = nullptr;

Expand Down Expand Up @@ -312,15 +311,5 @@ void MoverUnpacker::wait(void) {
pthread_mutex_unlock(&mutex_);
}

void MoverUnpacker::setReceiverProto(TileWrapper const * w) {

if (state_ != State::Idle) {
throw std::logic_error("[MoverUnpacker::setReceiverProto] "
"This setter should only be called in Idle state");
}
tileProto_ = w;

}

}

12 changes: 6 additions & 6 deletions src/Milhoja_Runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1145,7 +1145,7 @@ void Runtime::executeExtendedGpuTasks(const std::string& bundleName,
gpuTeam->attachThreadReceiver(postGpuTeam);
gpuTeam->attachDataReceiver(&gpuToHost1_);
gpuToHost1_.attachDataReceiver(postGpuTeam);
gpuToHost1_.setReceiverProto(&tilePrototype);
gpuToHost1_.setReceiverPrototype(&tilePrototype);

unsigned int nTotalThreads = gpuAction.nInitialThreads
+ postGpuAction.nInitialThreads
Expand Down Expand Up @@ -1252,7 +1252,7 @@ void Runtime::setupPipelineForExtGpuTasks(const std::string& bundleName,
gpuTeam->attachThreadReceiver(postGpuTeam);
gpuTeam->attachDataReceiver(&gpuToHost1_);
gpuToHost1_.attachDataReceiver(postGpuTeam);
gpuToHost1_.setReceiverProto(&tilePrototype);
gpuToHost1_.setReceiverPrototype(&tilePrototype);

unsigned int nTotalThreads = gpuAction.nInitialThreads
+ postGpuAction.nInitialThreads
Expand Down Expand Up @@ -2099,10 +2099,10 @@ void Runtime::executeExtendedCpuGpuSplitTasks(const std::string& bundleName,

teamA_cpu->attachThreadReceiver(teamB_cpu);
teamA_cpu->attachDataReceiver(teamB_cpu);
teamA_cpu->setReceiverProto(&postTilePrototype);
teamA_cpu->setReceiverPrototype(&postTilePrototype);
teamA_gpu->attachDataReceiver(&gpuToHost1_);
gpuToHost1_.attachDataReceiver(teamB_cpu);
gpuToHost1_.setReceiverProto(&postTilePrototype);
gpuToHost1_.setReceiverPrototype(&postTilePrototype);

// The action parallel distributor's thread resource is used
// once the distributor starts to wait
Expand Down Expand Up @@ -2274,10 +2274,10 @@ void Runtime::setupPipelineForExtCpuGpuSplitTasks(const std::string& bundleName,

teamA_cpu->attachThreadReceiver(teamB_cpu);
teamA_cpu->attachDataReceiver(teamB_cpu);
teamA_cpu->setReceiverProto(&postTilePrototype);
teamA_cpu->setReceiverPrototype(&postTilePrototype);
teamA_gpu->attachDataReceiver(&gpuToHost1_);
gpuToHost1_.attachDataReceiver(teamB_cpu);
gpuToHost1_.setReceiverProto(&postTilePrototype);
gpuToHost1_.setReceiverPrototype(&postTilePrototype);

// The action parallel distributor's thread resource is used
// once the distributor starts to wait
Expand Down
31 changes: 30 additions & 1 deletion src/Milhoja_RuntimeElement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ using namespace milhoja;
RuntimeElement::RuntimeElement(void)
: threadReceiver_{nullptr},
dataReceiver_{nullptr},
receiverPrototype_{nullptr},
calledCloseQueue_{}
{ }

Expand All @@ -17,6 +18,9 @@ RuntimeElement::~RuntimeElement(void) {
if (dataReceiver_) {
std::cerr << "[RuntimeElement::~RuntimeElement] Data Subscriber still attached\n";
}
if (receiverPrototype_) {
std::cerr << "[RuntimeElement::~RuntimeElement] Receiver Prototype still set\n";
}
if (!calledCloseQueue_.empty()) {
std::cerr << "[RuntimeElement::~RuntimeElement] Data publishers still attached\n";
// FIXME: Does this help prevent valgrind from finding potential pointer
Expand Down Expand Up @@ -105,7 +109,10 @@ std::string RuntimeElement::detachDataReceiver(void) {
}

dataReceiver_ = nullptr;


// if it has a receiver's prototype, release it
receiverPrototype_ = nullptr;

return "";
}

Expand Down Expand Up @@ -155,3 +162,25 @@ std::string RuntimeElement::detachDataPublisher(const RuntimeElement* publisher)
return "";
}

/**
* Set the data receiver's prototype for later use when passing
* a DataItem to the data receiver, for calling a proper constructor.
* Note that the receiver's prototype is only required for passing TilwWrapper, currently.
* Thus, calling this function for the DataPacket has no effect.
* The receiverPrototype_ will be nullified when RuntimeElement::detachDataReceiver is called.
*
* \param prototype - A prototype of a DataItem to be passed to the DataReceiver.
*/
std::string RuntimeElement::setReceiverPrototype(const DataItem* prototype) {

if (!prototype) {
return "Null receiver prototype is given";
} else if (receiverPrototype_) {
return "A receiver prototype is already given";
}

receiverPrototype_ = prototype;

return "";
}

67 changes: 54 additions & 13 deletions src/Milhoja_ThreadTeam.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "Milhoja_ThreadTeam.h"

#include <memory>
#include <sys/time.h>
#include <iostream>
#include <stdexcept>
Expand Down Expand Up @@ -908,9 +909,42 @@ std::string ThreadTeam::attachDataReceiver(RuntimeElement* receiver) {
return "";
}

void ThreadTeam::setReceiverProto(TileWrapper const * w) {
// TODO: should I mutex lock/unlock?
receiverProto_ = w;

/**
*
*/
std::string ThreadTeam::setReceiverPrototype(const DataItem* prototype) {
pthread_mutex_lock(&teamMutex_);

std::string errMsg("");
if (!state_) {
errMsg = printState_NotThreadsafe("setReceiverPrototype", 0,
"state_ is NULL");
pthread_mutex_unlock(&teamMutex_);
throw std::runtime_error(errMsg);
}
std::string msg = state_->isStateValid_NotThreadSafe();
if (msg != "") {
errMsg = printState_NotThreadsafe("setReceiverPrototype", 0, msg);
pthread_mutex_unlock(&teamMutex_);
throw std::runtime_error(errMsg);
} else if (state_->mode() != ThreadTeamMode::IDLE) {
errMsg = printState_NotThreadsafe("setReceiverPrototype", 0,
"A team can only be attached in the Idle mode");
pthread_mutex_unlock(&teamMutex_);
throw std::logic_error(errMsg);
}

errMsg = RuntimeElement::setReceiverPrototype(prototype);
if (errMsg != "") {
errMsg = printState_NotThreadsafe("setReceiverPrototype", 0, errMsg);
pthread_mutex_unlock(&teamMutex_);
throw std::logic_error(errMsg);
}

pthread_mutex_unlock(&teamMutex_);

return "";
}

/**
Expand Down Expand Up @@ -1479,17 +1513,24 @@ void* ThreadTeam::threadRoutine(void* varg) {

if (team->dataReceiver_) {
// Move the data item along so that dataItem is null
// TODO: very dirty ownership transfers
if (auto tileWrapper = std::dynamic_pointer_cast<TileWrapper>(dataItem)) {
// NOTE: this is the case where dataItem is a TilwWrapper,
// and the team->dataReceiver_ is another TileWrapper.
// Need to transfer dataItem initialized with data receiver's
// tileProtoType, as it may differ.
std::unique_ptr<TileWrapper> clonedTile =
team->receiverProto_->clone(std::move(tileWrapper->tile_));
// Release ownership, assuming clonedTile has new ownership
dataItem.reset();
team->dataReceiver_->enqueue(std::move(clonedTile));
if (auto tileWrapperPrototype =
dynamic_cast<const TileWrapper*>(team->receiverPrototype_)) {
// NOTE: this is the case where dataItem is a TilwWrapper,
// and the team->dataReceiver_ is another TileWrapper.

This comment has been minimized.

Copy link
@kweide

kweide Nov 19, 2024

Member

This comment is wrong... team->dataReceiver_ cannot be a TileWrapper.
Maybe this should say

                       //       and the team->receiverPrototype_ is another TileWrapper.

or

                       //       and the team->dataReceiver_ is a RuntimeElement whose
                       //       enqueue() method expects an argument of type TileWrapper.

?

This comment has been minimized.

Copy link
@ylee88

ylee88 Nov 19, 2024

Author Collaborator

Addressed in 513f2bd.

// Need to transfer dataItem initialized with data receiver's
// tileProtoType, as it may differ.
// TODO: very dirty ownership transfers
std::unique_ptr<TileWrapper> clonedTileWrapper =
tileWrapperPrototype->clone(std::move(tileWrapper->tile_));
// Release ownership, assuming clonedTileWrapper has new ownership
dataItem.reset();
team->dataReceiver_->enqueue(std::move(clonedTileWrapper));
}
else {
// receiver prototype is not a tilewrapper. do the normal thing
team->dataReceiver_->enqueue(std::move(dataItem));
}
}
else {
// the data receiver is a mover/unpacker
Expand Down

0 comments on commit 2f9cab6

Please sign in to comment.