Skip to content

Commit

Permalink
Merge pull request #1 from baidu/master
Browse files Browse the repository at this point in the history
Update from the original
  • Loading branch information
emailweixu authored Sep 13, 2016
2 parents 674d69c + 3fc99a2 commit 5c6ecb2
Show file tree
Hide file tree
Showing 9 changed files with 329 additions and 55 deletions.
1 change: 1 addition & 0 deletions paddle/gserver/gradientmachines/NeuralNetwork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ void NeuralNetwork::getState(MachineState& machineState) {
}

void NeuralNetwork::backward(const UpdateCallback& callback) {
gLayerStackTrace.pop(""); // tell layer trace is during backward.
FOR_EACH_R(layer, layers_) {
REGISTER_TIMER_INFO("BackwardTimer", (*layer)->getName().c_str());
if ((*layer)->needGradient()) {
Expand Down
27 changes: 15 additions & 12 deletions paddle/gserver/layers/CudnnBatchNormLayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,27 +114,30 @@ void CudnnBatchNormLayer::backward(const UpdateCallback& callback) {
} else {
create(tmpBiasGrad_, 1, channels_, &betaGrad);
}
#if CUDNN_VERSION < 5000

// because of the different api of cudnn v4 and v5.
if (weight_->getWGrad()) {
create(tmpWGrad_, 1, channels_, &gammaGrad);
}
if (biases_ && biases_->getWGrad()) {
create(tmpBiasGrad_, 1, channels_, &betaGrad);
if (hl_get_cudnn_lib_version() < 5000) {
if (weight_->getWGrad()) {
create(tmpWGrad_, 1, channels_, &gammaGrad);
}
if (biases_ && biases_->getWGrad()) {
create(tmpBiasGrad_, 1, channels_, &betaGrad);
}
}
#endif

hl_batch_norm_backward(ioDesc_, input, ioDesc_, outGrad,
ioDesc_, inGrad, bnParamDesc_,
gamma, gammaGrad, betaGrad,
EPS, savedMean, savedInvVar);

#if CUDNN_VERSION < 5000
// because of the different api of cudnn v4 and v5.
if (weight_->getWGrad() && biases_->getWGrad()) {
weight_->getWGrad()->add(*tmpWGrad_);
biases_->getWGrad()->add(*tmpBiasGrad_);
if (hl_get_cudnn_lib_version() < 5000) {
if (weight_->getWGrad() && biases_->getWGrad()) {
weight_->getWGrad()->add(*tmpWGrad_);
biases_->getWGrad()->add(*tmpBiasGrad_);
}
}
#endif

{
REGISTER_TIMER_INFO("WeightUpdate", getName().c_str());
biases_->getParameterPtr()->incUpdate(callback);
Expand Down
35 changes: 35 additions & 0 deletions paddle/utils/CustomStackTrace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,44 @@ limitations under the License. */


#include "CustomStackTrace.h"
#include "CommandLineParser.h"
#include <iostream>

P_DEFINE_bool(layer_stack_error_only_current_thread,
true,
"Dump current thread or whole process layer stack when signal error "
"occurred. true means only dump current thread layer stack");

namespace paddle {

CustomStackTrace<std::string> gLayerStackTrace;

static std::mutex gLayerStackTraceMtx;
void installLayerStackTracer() {
logging::installFailureWriter([](const char* data, int sz) {
std::lock_guard<std::mutex> guard(gLayerStackTraceMtx);
if (!gLayerStackTrace.empty()) {
size_t curTid = -1UL;
std::hash<std::thread::id> hasher;
gLayerStackTrace.dump([&curTid, &hasher](std::thread::id tid,
bool* isForwarding,
const std::string& layerName) {
if (curTid != hasher(tid)) {
if (curTid != -1UL) {
std::cerr << std::endl;
}
curTid = hasher(tid);
std::cerr << "Thread [" << tid << "] ";
if (isForwarding) {
std::cerr << (*isForwarding ? "Forwarding ": "Backwarding ");
}
}
std::cerr << layerName << ", ";
}, FLAGS_layer_stack_error_only_current_thread);
std::cerr << std::endl;
}
std::cerr.write(data, sz);
});
}

} // namespace paddle
164 changes: 128 additions & 36 deletions paddle/utils/CustomStackTrace.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ limitations under the License. */
#pragma once

#include <stack>
#include <thread>
#include <unordered_map>
#include <functional>

#include "ThreadLocal.h"

Expand All @@ -29,71 +32,160 @@ namespace paddle {
* @code{.cpp}
*
* paddle::CustomStackTrace<std::string> stack;
* PASS_TEST=0;
* for (auto& layer : layers){
* stack.push(layer->getName());
* layer->forward(passType);
* layer->forward();
* }
* for (auto& layer : layers){
*
* stack.pop(""); // mark under pop stage.
*
* for (auto it = layers.rbegin(); it != layers.rend(); ++it){
* auto& layer = *it;
* layer->backward(passType);
* stack.pop(layer->getName());
* }
*
* if(passType == PASS_TEST) {
* stack.clear();
* }
* else {
* stack.dump([](const std::string& layername){
* LOG(INFO) << "LayerName: " << layername;
* })
* }
*
*
* @endcode
*/
template <typename T>
class CustomStackTrace{
public:
/**
* @brief Pop out an item from the top of the stack. For safety the item
* will be poped should equal to ip.
* @brief Pop out an item from the top of the stack if item == top.
* Else, just set status to popping.
*/
void pop(const T& ip) {
auto& p = *logstack_;
CHECK_EQ(ip, p.top());
p.pop();
void pop(const T& item) {
pushing() = false;
auto& s = this->stack();
if (item == s.top()) {
s.pop();
}
}

/**
* @brief Empty the stack by sequence from top to button.
* @param[in] callback A function deal with each item while dumping.
* It must have and only have a in parameter which is the stack item.
* @brief clear current thread stack.
*/
template <typename Callback>
void dump(Callback callback) {
auto& p = *logstack_;
while (!p.empty()) {
callback(p.top());
p.pop();
void clear() {
auto& s = stack();
while (!s.empty()) {
s.pop();
}
}

/**
* @brief Only empty the stack.
* @brief return true if all thread's stack is empty.
* @return true if empty
*/
void clear() {
dump([](const T& ip){});
bool empty() const {
std::lock_guard<std::mutex> g(this->mtx_);
for (auto p : this->stackBuffers_) {
std::stack<T>& s = *p.second;
if (!s.empty()) {
return false;
}
}
return true;
}


/**
* @brief DumpCallback Type. It will be invoked many times by dump method.
*
* The first parameter is stack thread id.
* The second parameter is the last action of stack is push or not.
* The third parameter is the item in stack.
*/
typedef std::function<void(const std::thread::id& /*threadId*/,
bool* /*isPushing*/,
const T& /*item*/)> DumpCallback;

/**
* Dump all thread stack, and all stack will be cleared.
*/
void dump(const DumpCallback& callback, bool onlyCurrentThread = false) {
std::lock_guard<std::mutex> g(this->mtx_);
for (auto p : this->stackBuffers_) {
std::thread::id tid = p.first;
if (onlyCurrentThread && tid != std::this_thread::get_id()) {
continue;
}
std::stack<T>& s = *p.second;
bool* isPush = nullptr;
auto it = this->pushingBuffers_.find(tid);
if (it != this->pushingBuffers_.end()) {
isPush = it->second;
}

while (!s.empty()) {
callback(tid, isPush, s.top());
s.pop();
}
}
}

/**
* @brief Push item ip to the top of the stack.
* @brief Push item to current thread stack.
*/
void push(const T& ip) {
auto& p = *logstack_;
p.push(ip);
void push(const T& item) {
pushing() = true;
auto& p = this->stack();
p.push(item);
}

private:
ThreadLocalD<std::stack<T> > logstack_;
/**
* Get thread local attribute, and save them into a map (threadId => TYPE*)
*
* @tparam TYPE thread local attribute type.
* @param threadLocal Thread Local object.
* @param buffers a map from threadId to TYPE*
*/
template <typename TYPE>
inline TYPE& getThreadLocal(
ThreadLocal<TYPE>& threadLocal,
std::unordered_map<std::thread::id, TYPE*>& buffers) {
TYPE* retv = threadLocal.get(false);
if (retv) {
return *retv;
} else {
std::lock_guard<std::mutex> guard(this->mtx_);
retv = threadLocal.get();
auto id = std::this_thread::get_id();
buffers.insert({id, retv});
return *retv;
}
}

/**
* @brief Get thread local stack reference.
*/
std::stack<T>& stack() {
return this->getThreadLocal(this->logStack_,
this->stackBuffers_);
}

/**
* @brief Get thread local pushing flag.
*/
bool& pushing() {
return this->getThreadLocal(this->isPushing_,
this->pushingBuffers_);
}

private:
mutable std::mutex mtx_;

std::unordered_map<std::thread::id, std::stack<T>* > stackBuffers_;
std::unordered_map<std::thread::id, bool* > pushingBuffers_;
ThreadLocal<bool> isPushing_;
ThreadLocal<std::stack<T> > logStack_;
};

extern CustomStackTrace<std::string> gLayerStackTrace;

/**
* @brief Install a failure handler to print layer stack when error.
*/
extern void installLayerStackTracer();

} // namespace paddle
8 changes: 1 addition & 7 deletions paddle/utils/Util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,7 @@ void runInitFunctions() {

void initMain(int argc, char** argv) {
initializeLogging(argc, argv);
logging::installFailureWriter([](const char* data, int sz) {
std::cerr << "Current Layer forward/backward stack is " << std::endl;
gLayerStackTrace.dump([](const std::string& layername){
std::cerr << "LayerName: " << layername << std::endl;
});
std::cerr.write(data, sz);
});
installLayerStackTracer();
std::string line;
for (int i = 0; i < argc; ++i) {
line += argv[i];
Expand Down
10 changes: 10 additions & 0 deletions paddle/utils/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,13 @@ add_simple_unittest(test_CommandLineParser)
add_simple_unittest(test_Logging)
add_simple_unittest(test_Thread)
add_simple_unittest(test_StringUtils)
add_simple_unittest(test_CustomStackTrace)

add_executable(
test_CustomStackTracePrint
test_CustomStackTracePrint.cpp
)
link_paddle_exe(test_CustomStackTracePrint)
add_test(NAME test_CustomStackTracePrint
COMMAND ${PROJ_ROOT}/paddle/utils/tests/test_CustomStackTracePrint.sh
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
Loading

0 comments on commit 5c6ecb2

Please sign in to comment.