Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix trace profile 2 #15

Merged
merged 3 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion paddle/fluid/framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ if(TRACE_PROFILE)
link_directories("${SCALOPUS_PATH}/scalopus/so/scalopus_tracing")
link_directories("${SCALOPUS_PATH}/scalopus/so/thirdparty/seasocks/src/main/c/")
endif()

#windows treat symbolic file as a real file, which is different with unix
#We create a hidden file and compile it instead of origin source file.
function(windows_symbolic TARGET)
Expand Down
16 changes: 15 additions & 1 deletion paddle/fluid/framework/boxps_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ limitations under the License. */
#include "paddle/fluid/platform/device/xpu/xpu_info.h"
#endif

#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
// The producer side.
#include <scalopus_tracing/tracing.h>
#include <scalopus_transport/transport_loopback.h>
Expand All @@ -39,6 +40,7 @@ limitations under the License. */
#include <scalopus_general/endpoint_manager_poll.h>
#include <scalopus_general/general_provider.h>
#include <scalopus_tracing/native_trace_provider.h>
#endif

DECLARE_bool(enable_sync_dense_moment);
DECLARE_bool(check_nan_inf);
Expand Down Expand Up @@ -846,9 +848,13 @@ void BoxPSWorker::TrainFilesWithProfiler() {
main_timer.Resume();

reader_timer.Resume();
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
TRACE_SCOPE_START("PackBatchTask", dev_ctx_->Wait());
#endif
batch_size = PackBatchTask();
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
TRACE_SCOPE_END("PackBatchTask", dev_ctx_->Wait());
#endif
reader_timer.Pause();
if (batch_size <= 0) {
break;
Expand All @@ -860,22 +866,30 @@ void BoxPSWorker::TrainFilesWithProfiler() {
cal_timer.Resume();
int op_id = 0;
dev_ctx_->Wait();
std::vector<std::string> op_names;
// std::vector<std::string> op_names;
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
TRACE_SCOPE_START("ops run",);
#endif
for (auto& op : ops_) {
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
RUNTIME_TRACE_SCOPE_START((op->Type()+" run").c_str(),);
#endif
timeline.Start();
op->Run(*thread_scope_, place_);
dev_ctx_->Wait();
timeline.Pause();
op_total_time[op_id++] += timeline.ElapsedUS();
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
RUNTIME_TRACE_SCOPE_END((op->Type()+" run").c_str(),);
#endif
if (gc) {
DeleteUnusedTensors(*thread_scope_, op.get(), unused_vars_, gc.get());
}
}
dev_ctx_->Wait();
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
TRACE_SCOPE_END("ops run",);
#endif
cal_timer.Pause();
#if defined(PADDLE_WITH_CUDA)
if (FLAGS_check_nan_inf) {
Expand Down
9 changes: 8 additions & 1 deletion paddle/fluid/framework/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -492,22 +492,29 @@ void Executor::RunPartialPreparedContext(ExecutorPrepareContext* ctx,
gc = CreateGarbageCollector(place_, max_memory_size);
}

#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
TRACE_SCOPE_START("executor ops run",);
#endif
for (int64_t i = start_op_index; i < end_op_index; ++i) {
auto& op = ctx->ops_[i];
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
xpu_wait();
RUNTIME_TRACE_SCOPE_START((op->Type()+" run").c_str(),);
#endif
op->Run(*local_scope, place_);
if (gc) {
platform::RecordEvent record(
"CheckGC", platform::TracerEventType::UserDefined, 10);
DeleteUnusedTensors(*local_scope, op.get(), ctx->unused_vars_, gc.get());
}
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
RUNTIME_TRACE_SCOPE_END((op->Type()+" run").c_str(),);
xpu_wait();
#endif
}
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
TRACE_SCOPE_END("executor ops run",);

#endif
auto callback = [scope, local_scope, keep_kids]() {
if (local_scope != scope) {
VLOG(4) << "Delete scope: " << local_scope;
Expand Down
2 changes: 2 additions & 0 deletions paddle/fluid/framework/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ limitations under the License. */
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/platform/device_context.h"

#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
// The producer side.
#include <scalopus_tracing/tracing.h>
#include <scalopus_transport/transport_loopback.h>
Expand All @@ -36,6 +37,7 @@ limitations under the License. */
#include <scalopus_general/endpoint_manager_poll.h>
#include <scalopus_general/general_provider.h>
#include <scalopus_tracing/native_trace_provider.h>
#endif

namespace paddle {
namespace framework {
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/framework/fleet/box_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ void BoxWrapper::EndPass(bool need_save_delta) {
<< "MB, available: " << (available >> 20) << "MB";
}
#endif
#ifdef TRACE_PROFILE
#if defined(TRACE_PROFILE) && defined(PADDLE_WITH_XPU_KP)
static int trace_pass_count = std::getenv("TRACE_PASS_NUM")!=NULL ?
std::stoi(std::string(std::getenv("TRACE_PASS_NUM"))):
1;
Expand Down
7 changes: 5 additions & 2 deletions paddle/fluid/framework/fleet/box_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ limitations under the License. */
#include "paddle/fluid/string/string_helper.h"
#include "paddle/fluid/framework/fleet/metrics.h"
#include "paddle/fluid/framework/fleet/box_wrapper_kernel.h"

#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
// The producer side.
#include <scalopus_tracing/tracing.h>
#include <scalopus_transport/transport_loopback.h>
Expand All @@ -57,6 +59,7 @@ limitations under the License. */
#include <scalopus_general/endpoint_manager_poll.h>
#include <scalopus_general/general_provider.h>
#include <scalopus_tracing/native_trace_provider.h>
#endif
#define BUF_SIZE 1024 * 1024

DECLARE_bool(padbox_auc_runner_mode);
Expand Down Expand Up @@ -456,7 +459,7 @@ class BoxWrapper {
use_xpu_sparse_map_ = true;
}
#endif
#ifdef TRACE_PROFILE
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
// Client side to produce the tracepoints.
factory = std::make_shared<scalopus::TransportLoopbackFactory>();
const auto server = factory->serve();
Expand Down Expand Up @@ -1077,7 +1080,7 @@ class BoxWrapper {
std::set<std::string> slot_eval_set_;
std::atomic<uint16_t> dataset_id_{0};
std::atomic<uint16_t> round_id_{0};
#ifdef TRACE_PROFILE
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
scalopus::TransportLoopbackFactory::Ptr factory;
std::shared_ptr<scalopus::EndpointManagerPoll> manager;
scalopus::CatapultRecorder::Ptr catapult_recorder;
Expand Down
28 changes: 24 additions & 4 deletions paddle/fluid/framework/fleet/box_wrapper_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License. */

#include <vector>

#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
// The producer side.
#include <scalopus_tracing/tracing.h>
#include <scalopus_transport/transport_loopback.h>
Expand All @@ -25,6 +26,7 @@ limitations under the License. */
#include <scalopus_general/endpoint_manager_poll.h>
#include <scalopus_general/general_provider.h>
#include <scalopus_tracing/native_trace_provider.h>
#endif

DECLARE_bool(enable_pullpush_dedup_keys);

Expand Down Expand Up @@ -384,7 +386,9 @@ void BoxWrapper::PullSparseCaseXPU(const paddle::platform::Place& place,
void* total_values_xpu =
dev.pull_push_tensor.mutable_data<void>(total_bytes, place);

#ifdef TRACE_PROFILE
TRACE_SCOPE_START("copy keys", xpu_wait(ctx_xpu->xpu_stream));
#endif
VLOG(3) << "Begin copy keys, key_num[" << total_length << "]";
// LoDTensor& total_keys_tensor = dev.keys_tensor;
uint64_t* total_keys;
Expand Down Expand Up @@ -413,7 +417,9 @@ void BoxWrapper::PullSparseCaseXPU(const paddle::platform::Place& place,
slot_lengths_lod.size() * sizeof(int64_t),
XPU_HOST_TO_DEVICE);

#ifdef TRACE_PROFILE
TRACE_SCOPE_START("CopyKeys", xpu_wait(ctx_xpu->xpu_stream));
#endif
if (use_xpu_sparse_map_) {
box_wrapper_kernel_->CopyKeys(place, xpu_keys, (unsigned long long *)total_keys, slot_lens,
static_cast<int>(slot_lengths.size()),
Expand All @@ -425,20 +431,24 @@ void BoxWrapper::PullSparseCaseXPU(const paddle::platform::Place& place,
}
VLOG(3) << "Begin call PullSparseXPU in BoxPS, dev: " << device_id
<< " len: " << total_length;
#ifdef TRACE_PROFILE
TRACE_SCOPE_END("CopyKeys", xpu_wait(ctx_xpu->xpu_stream));
TRACE_SCOPE_END("copy keys", xpu_wait(ctx_xpu->xpu_stream));

TRACE_SCOPE_START("PullSparseXPU", xpu_wait(ctx_xpu->xpu_stream));
#endif
pull_boxps_timer.Start();
boxps_ptr_->PullSparseXPU(total_keys, total_values_xpu,
static_cast<int>(total_length), device_id);
pull_boxps_timer.Pause();
#ifdef TRACE_PROFILE
TRACE_SCOPE_END("PullSparseXPU", xpu_wait(ctx_xpu->xpu_stream));

#endif
VLOG(3) << "Begin Copy result to tensor, total_length[" << total_length
<< "]";

#ifdef TRACE_PROFILE
TRACE_SCOPE_START("pull copy", xpu_wait(ctx_xpu->xpu_stream));
#endif
boxps::FeaturePullOffset* pull_offset = nullptr;
if (dev.pull_offset.memory_size() == 0) {
pull_offset = dev.pull_offset.mutable_data<boxps::FeaturePullOffset>(
Expand All @@ -454,13 +464,17 @@ void BoxWrapper::PullSparseCaseXPU(const paddle::platform::Place& place,
xpu_memcpy(xpu_values, values.data(), values.size() * sizeof(float*),
XPU_HOST_TO_DEVICE);

#ifdef TRACE_PROFILE
TRACE_SCOPE_START("CopyForPull", xpu_wait(ctx_xpu->xpu_stream));
#endif
box_wrapper_kernel_->CopyForPull(place, xpu_keys, (float**)values.data(), total_values_xpu,
pull_offset, slot_lengths_lod.data(), slot_num, key2slot, hidden_size,
expand_embed_dim, total_length, total_dims, skip_offset,
expand_only);
#ifdef TRACE_PROFILE
TRACE_SCOPE_END("CopyForPull", xpu_wait(ctx_xpu->xpu_stream));
TRACE_SCOPE_END("pull copy", xpu_wait(ctx_xpu->xpu_stream));
#endif
all_timer.Pause();
#endif
}
Expand Down Expand Up @@ -672,7 +686,9 @@ void BoxWrapper::PushSparseGradCaseXPU(const paddle::platform::Place& place,

all_timer.Resume();

#ifdef TRACE_PROFILE
TRACE_SCOPE_START("push copy", xpu_wait(ctx_xpu->xpu_stream));
#endif
int64_t total_length = dev.total_key_length;
int64_t total_bytes = total_length * feature_push_size_;
void* total_grad_values_xpu =
Expand Down Expand Up @@ -709,11 +725,12 @@ void BoxWrapper::PushSparseGradCaseXPU(const paddle::platform::Place& place,
float** xpu_values = dev.values_ptr_tensor.data<float*>();
xpu_memcpy(xpu_values, grad_values.data(),
grad_values.size() * sizeof(float*), XPU_HOST_TO_DEVICE);
#ifdef TRACE_PROFILE
TRACE_SCOPE_START("CopyForPush's xpu::copy", xpu_wait(ctx_xpu->xpu_stream));
TRACE_SCOPE_END("CopyForPush's xpu::copy", xpu_wait(ctx_xpu->xpu_stream));

TRACE_SCOPE_START("CopyForPush", xpu_wait(ctx_xpu->xpu_stream));

#endif
float* real_grad_values;
for (int i = 0; i < slot_num; i++) {
if(grad_values[i] != nullptr) {
Expand All @@ -726,18 +743,21 @@ void BoxWrapper::PushSparseGradCaseXPU(const paddle::platform::Place& place,
hidden_size, batch_size, total_dims, skip_offset, key2slot);

push_boxps_timer.Resume();
#ifdef TRACE_PROFILE
TRACE_SCOPE_END("CopyForPush", xpu_wait(ctx_xpu->xpu_stream));
TRACE_SCOPE_END("push copy", xpu_wait(ctx_xpu->xpu_stream));

TRACE_SCOPE_START("PushSparseXPU", xpu_wait(ctx_xpu->xpu_stream));
#endif
int ret = boxps_ptr_->PushSparseXPU(total_keys,
reinterpret_cast<void*>(total_grad_values_xpu),
static_cast<int>(total_length), device_id);
PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
"PushSparseXPU failed in BoxPS."));
push_boxps_timer.Pause();
#ifdef TRACE_PROFILE
TRACE_SCOPE_END("PushSparseXPU", xpu_wait(ctx_xpu->xpu_stream));

#endif
all_timer.Pause();

#endif
Expand Down
11 changes: 10 additions & 1 deletion paddle/fluid/framework/fleet/box_wrapper_kernel.kps
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ limitations under the License. */
#include "xpu/kernel/math.h" // NOLINT
#include "xpu/kernel/simd.h"

#ifdef TRACE_PROFILE
// The producer side.
#include <scalopus_tracing/tracing.h>
#include <scalopus_transport/transport_loopback.h>
Expand All @@ -36,6 +37,7 @@ limitations under the License. */
#include <scalopus_general/endpoint_manager_poll.h>
#include <scalopus_general/general_provider.h>
#include <scalopus_tracing/native_trace_provider.h>
#endif

namespace paddle {
namespace framework {
Expand Down Expand Up @@ -305,8 +307,9 @@ inline void FeaturePullCopy(const paddle::platform::Place& place,
op,
sizeof(TEmbedxOp));

#ifdef TRACE_PROFILE
TRACE_SCOPE_START("PullCopy", xpu_wait(stream));

#endif
float* real_dst_vals;
for (int i = 0; i < slot_num; i++) {
if(xpu_values[i] != nullptr) {
Expand All @@ -327,11 +330,17 @@ inline void FeaturePullCopy(const paddle::platform::Place& place,
skip_offset,
cvm_offset);
xpu_wait(stream);
#ifdef TRACE_PROFILE
TRACE_SCOPE_END("PullCopy", );
#endif

#ifdef TRACE_PROFILE
TRACE_SCOPE_START("PullCopy's xpu::copy", xpu_wait(stream));
#endif
xpu_wait(stream);
#ifdef TRACE_PROFILE
TRACE_SCOPE_END("PullCopy's xpu::copy",);
#endif
}

void BoxWrapperKernel::CopyForPull(
Expand Down
8 changes: 8 additions & 0 deletions paddle/fluid/framework/naive_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,26 @@ void NaiveExecutor::Run() {
platform::RegisterModelLayout(ops_, place_);
#endif
platform::ScopedFlushDenormal flush;
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
TRACE_SCOPE_START("naive_executor ops run",);
#endif
for (auto &op : ops_) {
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
xpu_wait();
RUNTIME_TRACE_SCOPE_START((op->Type()+" run").c_str(),);
#endif
VLOG(4) << std::this_thread::get_id() << " run "
<< op->DebugStringEx(scope_) << " on scope " << scope_;
op->SetIsCalledByExecutor(false);
op->Run(*scope_, place_);
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
RUNTIME_TRACE_SCOPE_END((op->Type()+" run").c_str(),);
xpu_wait();
#endif
}
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
TRACE_SCOPE_END("naive_executor ops run",);
#endif
}

void NaiveExecutor::CreateVariables(const ProgramDesc &desc,
Expand Down
2 changes: 2 additions & 0 deletions paddle/fluid/framework/naive_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/place.h"

#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
// The producer side.
#include <scalopus_tracing/tracing.h>
#include <scalopus_transport/transport_loopback.h>
Expand All @@ -32,6 +33,7 @@
#include <scalopus_general/endpoint_manager_poll.h>
#include <scalopus_general/general_provider.h>
#include <scalopus_tracing/native_trace_provider.h>
#endif

namespace phi {
class DenseTensor;
Expand Down
Loading