Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update from the original #1

Merged
merged 10 commits into from
Sep 13, 2016
1 change: 1 addition & 0 deletions paddle/gserver/gradientmachines/NeuralNetwork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ void NeuralNetwork::getState(MachineState& machineState) {
}

void NeuralNetwork::backward(const UpdateCallback& callback) {
gLayerStackTrace.pop(""); // tell layer trace is during backward.
FOR_EACH_R(layer, layers_) {
REGISTER_TIMER_INFO("BackwardTimer", (*layer)->getName().c_str());
if ((*layer)->needGradient()) {
Expand Down
27 changes: 15 additions & 12 deletions paddle/gserver/layers/CudnnBatchNormLayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,27 +114,30 @@ void CudnnBatchNormLayer::backward(const UpdateCallback& callback) {
} else {
create(tmpBiasGrad_, 1, channels_, &betaGrad);
}
#if CUDNN_VERSION < 5000

// because of the different api of cudnn v4 and v5.
if (weight_->getWGrad()) {
create(tmpWGrad_, 1, channels_, &gammaGrad);
}
if (biases_ && biases_->getWGrad()) {
create(tmpBiasGrad_, 1, channels_, &betaGrad);
if (hl_get_cudnn_lib_version() < 5000) {
if (weight_->getWGrad()) {
create(tmpWGrad_, 1, channels_, &gammaGrad);
}
if (biases_ && biases_->getWGrad()) {
create(tmpBiasGrad_, 1, channels_, &betaGrad);
}
}
#endif

hl_batch_norm_backward(ioDesc_, input, ioDesc_, outGrad,
ioDesc_, inGrad, bnParamDesc_,
gamma, gammaGrad, betaGrad,
EPS, savedMean, savedInvVar);

#if CUDNN_VERSION < 5000
// because of the different api of cudnn v4 and v5.
if (weight_->getWGrad() && biases_->getWGrad()) {
weight_->getWGrad()->add(*tmpWGrad_);
biases_->getWGrad()->add(*tmpBiasGrad_);
if (hl_get_cudnn_lib_version() < 5000) {
if (weight_->getWGrad() && biases_->getWGrad()) {
weight_->getWGrad()->add(*tmpWGrad_);
biases_->getWGrad()->add(*tmpBiasGrad_);
}
}
#endif

{
REGISTER_TIMER_INFO("WeightUpdate", getName().c_str());
biases_->getParameterPtr()->incUpdate(callback);
Expand Down
35 changes: 35 additions & 0 deletions paddle/utils/CustomStackTrace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,44 @@ limitations under the License. */


#include "CustomStackTrace.h"
#include "CommandLineParser.h"
#include <iostream>

P_DEFINE_bool(layer_stack_error_only_current_thread,
true,
"Dump current thread or whole process layer stack when signal error "
"occurred. true means only dump current thread layer stack");

namespace paddle {

CustomStackTrace<std::string> gLayerStackTrace;

static std::mutex gLayerStackTraceMtx;
void installLayerStackTracer() {
logging::installFailureWriter([](const char* data, int sz) {
std::lock_guard<std::mutex> guard(gLayerStackTraceMtx);
if (!gLayerStackTrace.empty()) {
size_t curTid = -1UL;
std::hash<std::thread::id> hasher;
gLayerStackTrace.dump([&curTid, &hasher](std::thread::id tid,
bool* isForwarding,
const std::string& layerName) {
if (curTid != hasher(tid)) {
if (curTid != -1UL) {
std::cerr << std::endl;
}
curTid = hasher(tid);
std::cerr << "Thread [" << tid << "] ";
if (isForwarding) {
std::cerr << (*isForwarding ? "Forwarding ": "Backwarding ");
}
}
std::cerr << layerName << ", ";
}, FLAGS_layer_stack_error_only_current_thread);
std::cerr << std::endl;
}
std::cerr.write(data, sz);
});
}

} // namespace paddle
164 changes: 128 additions & 36 deletions paddle/utils/CustomStackTrace.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ limitations under the License. */
#pragma once

#include <stack>
#include <thread>
#include <unordered_map>
#include <functional>

#include "ThreadLocal.h"

Expand All @@ -29,71 +32,160 @@ namespace paddle {
* @code{.cpp}
*
* paddle::CustomStackTrace<std::string> stack;
* PASS_TEST=0;
* for (auto& layer : layers){
* stack.push(layer->getName());
* layer->forward(passType);
* layer->forward();
* }
* for (auto& layer : layers){
*
* stack.pop(""); // mark under pop stage.
*
* for (auto it = layers.rbegin(); it != layers.rend(); ++it){
* auto& layer = *it;
* layer->backward(passType);
* stack.pop(layer->getName());
* }
*
* if(passType == PASS_TEST) {
* stack.clear();
* }
* else {
* stack.dump([](const std::string& layername){
* LOG(INFO) << "LayerName: " << layername;
* })
* }
*
*
* @endcode
*/
template <typename T>
class CustomStackTrace{
public:
/**
* @brief Pop out an item from the top of the stack. For safety the item
* will be poped should equal to ip.
* @brief Pop out an item from the top of the stack if item == top.
* Else, just set status to popping.
*/
void pop(const T& ip) {
auto& p = *logstack_;
CHECK_EQ(ip, p.top());
p.pop();
void pop(const T& item) {
pushing() = false;
auto& s = this->stack();
if (item == s.top()) {
s.pop();
}
}

/**
* @brief Empty the stack by sequence from top to button.
* @param[in] callback A function deal with each item while dumping.
* It must have and only have a in parameter which is the stack item.
* @brief clear current thread stack.
*/
template <typename Callback>
void dump(Callback callback) {
auto& p = *logstack_;
while (!p.empty()) {
callback(p.top());
p.pop();
void clear() {
auto& s = stack();
while (!s.empty()) {
s.pop();
}
}

/**
* @brief Only empty the stack.
* @brief return true if all thread's stack is empty.
* @return true if empty
*/
void clear() {
dump([](const T& ip){});
bool empty() const {
std::lock_guard<std::mutex> g(this->mtx_);
for (auto p : this->stackBuffers_) {
std::stack<T>& s = *p.second;
if (!s.empty()) {
return false;
}
}
return true;
}


/**
* @brief DumpCallback Type. It will be invoked many times by dump method.
*
* The first parameter is stack thread id.
* The second parameter is the last action of stack is push or not.
* The third parameter is the item in stack.
*/
typedef std::function<void(const std::thread::id& /*threadId*/,
bool* /*isPushing*/,
const T& /*item*/)> DumpCallback;

/**
* Dump all thread stack, and all stack will be cleared.
*/
void dump(const DumpCallback& callback, bool onlyCurrentThread = false) {
std::lock_guard<std::mutex> g(this->mtx_);
for (auto p : this->stackBuffers_) {
std::thread::id tid = p.first;
if (onlyCurrentThread && tid != std::this_thread::get_id()) {
continue;
}
std::stack<T>& s = *p.second;
bool* isPush = nullptr;
auto it = this->pushingBuffers_.find(tid);
if (it != this->pushingBuffers_.end()) {
isPush = it->second;
}

while (!s.empty()) {
callback(tid, isPush, s.top());
s.pop();
}
}
}

/**
* @brief Push item ip to the top of the stack.
* @brief Push item to current thread stack.
*/
void push(const T& ip) {
auto& p = *logstack_;
p.push(ip);
void push(const T& item) {
pushing() = true;
auto& p = this->stack();
p.push(item);
}

private:
ThreadLocalD<std::stack<T> > logstack_;
/**
* Get thread local attribute, and save them into a map (threadId => TYPE*)
*
* @tparam TYPE thread local attribute type.
* @param threadLocal Thread Local object.
* @param buffers a map from threadId to TYPE*
*/
template <typename TYPE>
inline TYPE& getThreadLocal(
ThreadLocal<TYPE>& threadLocal,
std::unordered_map<std::thread::id, TYPE*>& buffers) {
TYPE* retv = threadLocal.get(false);
if (retv) {
return *retv;
} else {
std::lock_guard<std::mutex> guard(this->mtx_);
retv = threadLocal.get();
auto id = std::this_thread::get_id();
buffers.insert({id, retv});
return *retv;
}
}

/**
* @brief Get thread local stack reference.
*/
std::stack<T>& stack() {
return this->getThreadLocal(this->logStack_,
this->stackBuffers_);
}

/**
* @brief Get thread local pushing flag.
*/
bool& pushing() {
return this->getThreadLocal(this->isPushing_,
this->pushingBuffers_);
}

private:
mutable std::mutex mtx_;

std::unordered_map<std::thread::id, std::stack<T>* > stackBuffers_;
std::unordered_map<std::thread::id, bool* > pushingBuffers_;
ThreadLocal<bool> isPushing_;
ThreadLocal<std::stack<T> > logStack_;
};

extern CustomStackTrace<std::string> gLayerStackTrace;

/**
* @brief Install a failure handler to print layer stack when error.
*/
extern void installLayerStackTracer();

} // namespace paddle
8 changes: 1 addition & 7 deletions paddle/utils/Util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,7 @@ void runInitFunctions() {

void initMain(int argc, char** argv) {
initializeLogging(argc, argv);
logging::installFailureWriter([](const char* data, int sz) {
std::cerr << "Current Layer forward/backward stack is " << std::endl;
gLayerStackTrace.dump([](const std::string& layername){
std::cerr << "LayerName: " << layername << std::endl;
});
std::cerr.write(data, sz);
});
installLayerStackTracer();
std::string line;
for (int i = 0; i < argc; ++i) {
line += argv[i];
Expand Down
10 changes: 10 additions & 0 deletions paddle/utils/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,13 @@ add_simple_unittest(test_CommandLineParser)
add_simple_unittest(test_Logging)
add_simple_unittest(test_Thread)
add_simple_unittest(test_StringUtils)
add_simple_unittest(test_CustomStackTrace)

add_executable(
test_CustomStackTracePrint
test_CustomStackTracePrint.cpp
)
link_paddle_exe(test_CustomStackTracePrint)
add_test(NAME test_CustomStackTracePrint
COMMAND ${PROJ_ROOT}/paddle/utils/tests/test_CustomStackTracePrint.sh
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
Loading