Skip to content

Commit

Permalink
Remove LoDTensor and Tensor in fluid except operators folder (#48416)
Browse files Browse the repository at this point in the history
* Update communicator.cc

* Update communicator.cc

* remove LoDTensor

* remove LoDTensor and Tensor
  • Loading branch information
Liyulingyue authored Nov 28, 2022
1 parent d80330f commit 4527d24
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 113 deletions.
28 changes: 14 additions & 14 deletions paddle/fluid/distributed/ps/service/communicator/communicator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ limitations under the License. */
namespace paddle {
namespace distributed {

using LoDTensor = phi::DenseTensor;
using phi::SelectedRows;

const uint32_t MAX_FEASIGN_NUM = 1024 * 100 * 100;
Expand Down Expand Up @@ -97,11 +96,11 @@ void Communicator::RpcRecvDense(const std::vector<std::string> &varnames,
regions.reserve(varnames.size());
for (auto &t : varnames) {
Variable *var = scope->Var(t);
LoDTensor *tensor = var->GetMutable<LoDTensor>();
phi::DenseTensor *tensor = var->GetMutable<phi::DenseTensor>();
if (platform::is_gpu_place(tensor->place())) {
#ifdef PADDLE_WITH_CUDA
Variable *temp_var = xpu_temp_scope_->Var(t);
LoDTensor *temp_tensor = temp_var->GetMutable<LoDTensor>();
phi::DenseTensor *temp_tensor = temp_var->GetMutable<phi::DenseTensor>();
temp_tensor->Resize(tensor->dims());
float *temp_data = temp_tensor->mutable_data<float>(platform::CPUPlace());
paddle::distributed::Region reg(temp_data, tensor->numel());
Expand All @@ -122,7 +121,7 @@ void Communicator::RpcRecvDense(const std::vector<std::string> &varnames,

for (auto &t : varnames) {
Variable *var = scope->FindVar(t);
LoDTensor *tensor = var->GetMutable<LoDTensor>();
phi::DenseTensor *tensor = var->GetMutable<phi::DenseTensor>();
VLOG(3) << "Communicator::RecvNoBarrier Var " << t << " On gpu? "
<< platform::is_gpu_place(tensor->place());

Expand All @@ -132,8 +131,8 @@ void Communicator::RpcRecvDense(const std::vector<std::string> &varnames,
<< " Temp_data[-1] " << temp_recv_data[tensor->numel() - 1];
if (platform::is_gpu_place(tensor->place())) {
#ifdef PADDLE_WITH_CUDA
LoDTensor *temp_tensor =
xpu_temp_scope_->FindVar(t)->GetMutable<LoDTensor>();
phi::DenseTensor *temp_tensor =
xpu_temp_scope_->FindVar(t)->GetMutable<phi::DenseTensor>();
framework::TensorCopy(*temp_tensor, tensor->place(), tensor);
float *temp_data = temp_tensor->mutable_data<float>(platform::CPUPlace());
VLOG(1) << "Communicator::RpcRecvDense Var " << t << " table_id "
Expand All @@ -157,11 +156,11 @@ void Communicator::RpcSendDenseParam(const std::vector<std::string> &varnames,
for (auto &t : varnames) {
Variable *var = scope.FindVar(t);
CHECK(var != nullptr) << "var[" << t << "] not found";
LoDTensor *tensor = var->GetMutable<LoDTensor>();
phi::DenseTensor *tensor = var->GetMutable<phi::DenseTensor>();
if (platform::is_gpu_place(tensor->place())) {
#ifdef PADDLE_WITH_CUDA
Variable *temp_var = xpu_temp_scope_->Var(t);
LoDTensor *temp_tensor = temp_var->GetMutable<LoDTensor>();
phi::DenseTensor *temp_tensor = temp_var->GetMutable<phi::DenseTensor>();
temp_tensor->Resize(tensor->dims());
float *temp_data = temp_tensor->mutable_data<float>(platform::CPUPlace());
framework::TensorCopy(*tensor, platform::CPUPlace(), temp_tensor);
Expand Down Expand Up @@ -203,7 +202,8 @@ void Communicator::RpcSendDense(const CommContext &ctx,
float *data = dense_data->data();
uint32_t pos = 0;
for (size_t i = 0; i < var_names.size(); ++i) {
const LoDTensor tensor = scope.FindVar(var_names[i])->Get<LoDTensor>();
const phi::DenseTensor tensor =
scope.FindVar(var_names[i])->Get<phi::DenseTensor>();
size_t count = static_cast<size_t>(tensor.numel());
const float *g = tensor.data<float>();
CHECK(pos + count <= dense_data->size())
Expand Down Expand Up @@ -472,13 +472,13 @@ void AsyncCommunicator::RecvNoBarrier() {
auto var_names = iter.second;
for (auto &t : var_names) {
Variable *var = recv_scope_->FindVar(t);
LoDTensor *tensor = var->GetMutable<LoDTensor>();
phi::DenseTensor *tensor = var->GetMutable<phi::DenseTensor>();
VLOG(3) << "AsyncCommunicator::RecvNoBarrier Var " << t << " On gpu? "
<< platform::is_gpu_place(tensor->place());
if (platform::is_gpu_place(tensor->place())) {
#ifdef PADDLE_WITH_CUDA
LoDTensor *temp_tensor =
xpu_temp_scope_->FindVar(t)->GetMutable<LoDTensor>();
phi::DenseTensor *temp_tensor =
xpu_temp_scope_->FindVar(t)->GetMutable<phi::DenseTensor>();
framework::TensorCopy(*temp_tensor, tensor->place(), tensor);
#endif
}
Expand Down Expand Up @@ -591,8 +591,8 @@ void AsyncCommunicator::PullSparseToTensorSync(
uint64_t padding_id,
platform::Place place,
bool is_training,
std::vector<const LoDTensor *> *inputs,
std::vector<LoDTensor *> *outputs) {
std::vector<const phi::DenseTensor *> *inputs,
std::vector<phi::DenseTensor *> *outputs) {
std::vector<uint64_t> fea_keys;
std::vector<float *> pull_result_ptr;
fea_keys.reserve(MAX_FEASIGN_NUM / 100);
Expand Down
46 changes: 23 additions & 23 deletions paddle/fluid/distributed/ps/wrapper/fleet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ limitations under the License. */
namespace paddle {
namespace distributed {

using LoDTensor = phi::DenseTensor;
using framework::ProgramDesc;
using framework::VarDesc;
using framework::Variable;
Expand Down Expand Up @@ -232,7 +231,7 @@ std::future<int32_t> FleetWrapper::PullSparseVarsAsync(
if (var == nullptr) {
continue;
}
LoDTensor* tensor = var->GetMutable<LoDTensor>();
phi::DenseTensor* tensor = var->GetMutable<phi::DenseTensor>();
CHECK(tensor != nullptr) << "tensor of var " << name << " is null";
int64_t* ids = tensor->data<int64_t>();
size_t len = tensor->numel();
Expand Down Expand Up @@ -279,7 +278,7 @@ void FleetWrapper::PullSparseVarsSync(
if (var == nullptr) {
continue;
}
LoDTensor* tensor = var->GetMutable<LoDTensor>();
phi::DenseTensor* tensor = var->GetMutable<phi::DenseTensor>();
CHECK(tensor != nullptr) << "tensor of var " << name << " is null";
int64_t* ids = tensor->data<int64_t>();
size_t len = tensor->numel();
Expand Down Expand Up @@ -327,13 +326,14 @@ void FleetWrapper::PullSparseVarsSync(
// is_training is true means training, false means inference, the behavior is
// different on pserver

void FleetWrapper::PullSparseToTensorSync(const uint64_t table_id,
int fea_dim,
uint64_t padding_id,
platform::Place place,
bool is_training,
std::vector<const LoDTensor*>* inputs,
std::vector<LoDTensor*>* outputs) {
void FleetWrapper::PullSparseToTensorSync(
const uint64_t table_id,
int fea_dim,
uint64_t padding_id,
platform::Place place,
bool is_training,
std::vector<const phi::DenseTensor*>* inputs,
std::vector<phi::DenseTensor*>* outputs) {
std::vector<uint64_t> fea_keys;
std::vector<float*> pull_result_ptr;
fea_keys.reserve(MAX_FEASIGN_NUM / 100);
Expand Down Expand Up @@ -398,7 +398,7 @@ void FleetWrapper::PullDenseVarsAsync(
varname = var_names[i] + "pin";
}
Variable* var = scope.FindVar(varname);
LoDTensor* tensor = var->GetMutable<LoDTensor>();
phi::DenseTensor* tensor = var->GetMutable<phi::DenseTensor>();
float* w = tensor->data<float>();
paddle::distributed::Region reg(w, tensor->numel());
regions[i] = std::move(reg);
Expand All @@ -417,7 +417,7 @@ void FleetWrapper::PullDenseVarsSync(
regions.reserve(var_names.size());
for (auto& t : var_names) {
Variable* var = scope.FindVar(t);
LoDTensor* tensor = var->GetMutable<LoDTensor>();
phi::DenseTensor* tensor = var->GetMutable<phi::DenseTensor>();
if (!platform::is_gpu_place(tensor->place())) {
float* w = tensor->data<float>();
paddle::distributed::Region reg(w, tensor->numel());
Expand All @@ -437,7 +437,7 @@ void FleetWrapper::PushDenseParamSync(
for (auto& t : var_names) {
Variable* var = scope.FindVar(t);
CHECK(var != nullptr) << "var[" << t << "] not found";
LoDTensor* tensor = var->GetMutable<LoDTensor>();
phi::DenseTensor* tensor = var->GetMutable<phi::DenseTensor>();
if (!platform::is_gpu_place(tensor->place())) {
float* g = tensor->mutable_data<float>(place);
paddle::distributed::Region reg(g, tensor->numel());
Expand Down Expand Up @@ -468,7 +468,7 @@ void FleetWrapper::PushDenseVarsAsync(
for (auto& t : var_names) {
Variable* var = scope.FindVar(t);
CHECK(var != nullptr) << "var[" << t << "] not found";
LoDTensor* tensor = var->GetMutable<LoDTensor>();
phi::DenseTensor* tensor = var->GetMutable<phi::DenseTensor>();
int count = tensor->numel();
float* g = tensor->mutable_data<float>(place);
// TODO(zhaocaibei123): how to get batch_size in op?
Expand Down Expand Up @@ -544,8 +544,8 @@ void FleetWrapper::PushSparseFromTensorWithLabelAsync(
const std::string& click_name,
platform::Place place,
const std::vector<std::string>& input_names,
std::vector<const LoDTensor*>* inputs,
std::vector<const LoDTensor*>* outputs) {
std::vector<const phi::DenseTensor*>* inputs,
std::vector<const phi::DenseTensor*>* outputs) {
// not support
return;
}
Expand All @@ -555,11 +555,11 @@ void FleetWrapper::PushSparseFromTensorAsync(
int fea_dim,
uint64_t padding_id,
platform::Place place,
std::vector<const LoDTensor*>* inputs,
std::vector<const phi::DenseTensor*>* inputs,
std::vector<int>& slots,
const LoDTensor* shows,
const LoDTensor* clks,
std::vector<LoDTensor*>* outputs,
const phi::DenseTensor* shows,
const phi::DenseTensor* clks,
std::vector<phi::DenseTensor*>* outputs,
bool use_cvm_op) {
CHECK(slots.size() == inputs->size());
int batch_size = -1;
Expand Down Expand Up @@ -777,7 +777,7 @@ void FleetWrapper::ShrinkDenseTable(int table_id,
Variable* var = scope->FindVar(name);
CHECK(var != nullptr) << "var[" << name << "] not found";
VLOG(3) << "prepare shrink dense batch_sum";
LoDTensor* tensor = var->GetMutable<LoDTensor>();
phi::DenseTensor* tensor = var->GetMutable<phi::DenseTensor>();
float* g = tensor->data<float>();

// show_batch_sum += N * log(decay)
Expand All @@ -787,7 +787,7 @@ void FleetWrapper::ShrinkDenseTable(int table_id,
Variable* var_size = scope->FindVar(size_name);
CHECK(var_size != nullptr) << "var[" << size_name << "] not found";
VLOG(3) << "shrink dense batch_sum: " << name << ", " << size_name;
float* g_size = var_size->GetMutable<LoDTensor>()->data<float>();
float* g_size = var_size->GetMutable<phi::DenseTensor>()->data<float>();

for (int k = 0; k < tensor->numel(); k += emb_dim) {
g[k] = g[k] + g_size[k] * log(decay);
Expand All @@ -797,7 +797,7 @@ void FleetWrapper::ShrinkDenseTable(int table_id,
} else {
Variable* var = scope->FindVar(name);
CHECK(var != nullptr) << "var[" << name << "] not found";
LoDTensor* tensor = var->GetMutable<LoDTensor>();
phi::DenseTensor* tensor = var->GetMutable<phi::DenseTensor>();
float* g = tensor->data<float>();
paddle::distributed::Region reg(g, tensor->numel());
regions.emplace_back(std::move(reg));
Expand Down
28 changes: 14 additions & 14 deletions paddle/fluid/distributed/ps/wrapper/fleet.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ namespace distributed {

class PSCore;

using LoDTensor = phi::DenseTensor;
using framework::Scope;
using framework::Variable;
using phi::SelectedRows;
Expand Down Expand Up @@ -111,13 +110,14 @@ class FleetWrapper {
// is_training is true means training, false means inference, the behavior is
// different on pserver

void PullSparseToTensorSync(const uint64_t table_id,
int fea_dim,
uint64_t padding_id,
platform::Place place,
bool is_training,
std::vector<const LoDTensor*>* inputs, // NOLINT
std::vector<LoDTensor*>* outputs); // NOLINT
void PullSparseToTensorSync(
const uint64_t table_id,
int fea_dim,
uint64_t padding_id,
platform::Place place,
bool is_training,
std::vector<const phi::DenseTensor*>* inputs, // NOLINT
std::vector<phi::DenseTensor*>* outputs); // NOLINT

// pull dense variables from server in sync mod
// Param<in>: scope, table_id, var_names
Expand Down Expand Up @@ -188,18 +188,18 @@ class FleetWrapper {
const std::string& click_name,
platform::Place place,
const std::vector<std::string>& input_names,
std::vector<const LoDTensor*>* inputs, // NOLINT
std::vector<const LoDTensor*>* outputs); // NOLINT
std::vector<const phi::DenseTensor*>* inputs, // NOLINT
std::vector<const phi::DenseTensor*>* outputs); // NOLINT

void PushSparseFromTensorAsync(const uint64_t table_id,
int fea_dim,
uint64_t padding_id,
platform::Place place,
std::vector<const LoDTensor*>* inputs,
std::vector<const phi::DenseTensor*>* inputs,
std::vector<int>& slots, // NOLINT
const LoDTensor* shows,
const LoDTensor* clicks,
std::vector<LoDTensor*>* outputs,
const phi::DenseTensor* shows,
const phi::DenseTensor* clicks,
std::vector<phi::DenseTensor*>* outputs,
bool use_cvm_op = false);
// Push sparse variables to server in Async mode
// Param<In>: scope, table_id, fea_keys, sparse_grad_names
Expand Down
1 change: 0 additions & 1 deletion paddle/fluid/distributed/ps/wrapper/ps_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ namespace distributed {

class PSCore;

using LoDTensor = phi::DenseTensor;
using framework::Scope;
using framework::Variable;
using phi::SelectedRows;
Expand Down
Loading

0 comments on commit 4527d24

Please sign in to comment.