Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

XPU Support external stream #53334

Merged
merged 4 commits into from
May 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 94 additions & 4 deletions paddle/fluid/inference/api/analysis_predictor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,26 @@ bool AnalysisPredictor::Init(
InitDeviceContexts();
}
}
#endif
#if defined(PADDLE_WITH_XPU)
if (config_.use_xpu_ && config_.use_external_stream_) {
private_context_ = true;
}
if (private_context_) {
if (!status_is_cloned_) {
predictor_stream_ = config_.GetExecStream();
}
// NOTE: If the external_stream equals to global_device_contexts's stream,
// then fallback.
auto global_stream =
static_cast<phi::XPUContext *>(
platform::DeviceContextPool::Instance().Get(place_))
->stream();
if (predictor_stream_ != global_stream) {
InitResourceManager(predictor_stream_);
InitDeviceContexts();
}
}
#endif
inference::DisplayMemoryInfo(place_, "Init predictor");
return true;
Expand Down Expand Up @@ -418,6 +438,9 @@ void AnalysisPredictor::InitResourceManager(void *stream) {
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
predictor_stream_ =
ResourceManager::Instance().InitGPUResource(place_, stream);
#elif defined(PADDLE_WITH_XPU)
predictor_stream_ =
ResourceManager::Instance().InitXPUResource(place_, stream);
#endif
}

Expand Down Expand Up @@ -487,6 +510,32 @@ void AnalysisPredictor::InitDeviceContexts() {
return std::unique_ptr<phi::DeviceContext>(gpu_context);
}));
}
#endif
#if defined(PADDLE_WITH_XPU)
if (place_.GetType() == phi::AllocationType::XPU) {
device_contexts_.emplace(
place_, std::async(std::launch::deferred, [=] {
auto *xpu_resource =
ResourceManager::Instance().GetXPUResource(predictor_stream_);
auto &instance = memory::allocation::AllocatorFacade::Instance();
auto *xpu_context = new InferXPUContext(place_);
xpu_context->SetAllocator(instance.GetAllocator(place_).get());
xpu_context->SetGenerator(
phi::DefaultXPUGenerator(place_.GetDeviceId()).get());
xpu_context->SetHostAllocator(
instance.GetAllocator(platform::CPUPlace()).get());
xpu_context->SetHostGenerator(phi::DefaultCPUGenerator().get());
xpu_context->SetZeroAllocator(
instance.GetZeroAllocator(place_).get());
xpu_context->SetHostZeroAllocator(
instance.GetZeroAllocator(platform::CPUPlace()).get());
xpu_context->SetStream(xpu_resource->GetStream());
xpu_context->SetDriverVersion(xpu_resource->GetDriverVersion());
xpu_context->SetRuntimeVersion(xpu_resource->GetRuntimeVersion());
xpu_context->SetXpuVersion(xpu_resource->GetXpuVersion());
return std::unique_ptr<phi::DeviceContext>(xpu_context);
}));
}
#endif
// TODO(Inference): Support other backends.
}
Expand All @@ -506,10 +555,14 @@ void *AnalysisPredictor::GetExecStream() const {
#endif
#if defined(PADDLE_WITH_XPU)
if (place_.GetType() == phi::AllocationType::XPU) {
paddle::platform::DeviceContextPool &pool =
paddle::platform::DeviceContextPool::Instance();
return reinterpret_cast<const phi::XPUContext *>(pool.Get(place_))
->stream();
if (private_context_) {
return predictor_stream_;
} else {
paddle::platform::DeviceContextPool &pool =
paddle::platform::DeviceContextPool::Instance();
return reinterpret_cast<const phi::XPUContext *>(pool.Get(place_))
->stream();
}
}
#endif
// TODO(inference): Support other backends.
Expand Down Expand Up @@ -2041,6 +2094,33 @@ bool AnalysisPredictor::ExpRunWithExternalStream(const gpuStream_t stream) {
}
#endif

bool AnalysisPredictor::ExpRunWithExternalStream(void *stream) {
#if defined(PADDLE_WITH_XPU)
if (!private_context_) {
PADDLE_THROW(platform::errors::Fatal(
"Please use config.SetExecStream to init resources, and then we "
"will bind resources to execution stream."));
}
if (stream != predictor_stream_) {
paddle::platform::XPUStreamSync(
static_cast<paddle::xpuStream>(predictor_stream_));
ResourceManager::Instance().XpuResourceReBindStream(predictor_stream_,
stream);
predictor_stream_ = stream;

auto *dev_ctxs = reinterpret_cast<const std::map<
phi::Place,
std::shared_future<std::unique_ptr<phi::DeviceContext>>> *>(
this->GetDeviceContexts());
auto *dev_ctx =
static_cast<InferXPUContext *>(dev_ctxs->at(place_).get().get());
dev_ctx->SetStream(stream);
}
return ZeroCopyRun();
#endif
return false;
}

void AnalysisPredictor::CollectShapeRangeInfo() {
// if use gpu, sync first.
paddle::platform::DeviceContextPool &pool =
Expand Down Expand Up @@ -2404,7 +2484,12 @@ AnalysisPredictor::~AnalysisPredictor() {
if (predictor_stream_ != nullptr) {
ResourceManager::Instance().DestroyGPUResource(predictor_stream_);
}
#elif defined(PADDLE_WITH_XPU)
if (predictor_stream_ != nullptr) {
ResourceManager::Instance().DestroyXPUResource(predictor_stream_);
}
#endif

if (place_.GetType() != phi::AllocationType::UNDEFINED) {
memory::Release(place_);
}
Expand Down Expand Up @@ -2911,6 +2996,11 @@ bool InternalUtils::RunWithExternalStream(paddle_infer::Predictor *p,
#endif
return false;
}
bool InternalUtils::RunWithExternalStream(paddle_infer::Predictor *p,
void *stream) {
auto pred = dynamic_cast<paddle::AnalysisPredictor *>(p->predictor_.get());
return pred->ExpRunWithExternalStream(stream);
}

void InternalUtils::UpdateConfigInterleaved(paddle_infer::Config *c,
bool with_interleaved) {
Expand Down
3 changes: 3 additions & 0 deletions paddle/fluid/inference/api/analysis_predictor.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ class AnalysisPredictor : public PaddlePredictor {
bool ExpRunWithExternalStream(const gpuStream_t stream);
#endif

// Note: Can only be used under thread_local semantics.
bool ExpRunWithExternalStream(void *stream);

///
/// \brief Get the execution stream on devices with a concept of stream,
/// otherwise returns nullptr.
Expand Down
5 changes: 5 additions & 0 deletions paddle/fluid/inference/api/infer_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,9 @@ InferGPUContext::InferGPUContext(const phi::Place& place)
: phi::GPUContext(place, false) {}
#endif

#if defined(PADDLE_WITH_XPU)
InferXPUContext::InferXPUContext(const phi::Place& place)
: phi::XPUContext(place) {}
#endif

} // namespace paddle
11 changes: 11 additions & 0 deletions paddle/fluid/inference/api/infer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,15 @@ class InferGPUContext : public phi::GPUContext {
using phi::GPUContext::SetRuntimeVersion;
};
#endif

#if defined(PADDLE_WITH_XPU)
class InferXPUContext : public phi::XPUContext {
public:
explicit InferXPUContext(const phi::Place& place);
using phi::XPUContext::SetDriverVersion;
using phi::XPUContext::SetRuntimeVersion;
using phi::XPUContext::SetStream;
using phi::XPUContext::SetXpuVersion;
};
#endif
} // namespace paddle
3 changes: 2 additions & 1 deletion paddle/fluid/inference/api/paddle_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,8 @@ class PD_INFER_DECL InternalUtils {
cudaStream_t stream);
static bool RunWithExternalStream(paddle_infer::Predictor* pred,
hipStream_t stream);

static bool RunWithExternalStream(paddle_infer::Predictor* pred,
void* stream);
static void UpdateConfigInterleaved(paddle_infer::Config* c,
bool with_interleaved);

Expand Down
120 changes: 120 additions & 0 deletions paddle/fluid/inference/api/resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
#include "paddle/phi/backends/dynload/cusparse.h"
#endif // PADDLE_WITH_CUDA

#ifdef PADDLE_WITH_XPU
#include "paddle/phi/backends/xpu/xpu_info.h"
#endif
namespace paddle {
namespace internal {

Expand Down Expand Up @@ -545,4 +548,121 @@ int ResourceManager::RefCount(void* stream) const {
}
#endif

#if defined(PADDLE_WITH_XPU)
// XPUContextResource
XPUContextResource::XPUContextResource(const phi::Place& place, void* stream)
: place_(place) {
InitXPUResource(stream);
}

XPUContextResource::~XPUContextResource() {}

void XPUContextResource::InitXPUResource(void* stream) {
phi::backends::xpu::XPUDeviceGuard guard(place_.device);
if (stream) {
owned_stream_ = false;
stream_ = stream;
}
InitXpuProperties();
}

void XPUContextResource::InitXpuProperties() {
phi::backends::xpu::XPUDeviceGuard guard(place_.device);
driver_version_ = phi::backends::xpu::GetDriverVersion();
runtime_version_ = phi::backends::xpu::GetRuntimeVersion();
xpu_version_ =
static_cast<int>(phi::backends::xpu::get_xpu_version(place_.device));
}
void* XPUContextResource::GetStream() const { return stream_; }

int XPUContextResource::GetDriverVersion() const { return driver_version_; }

int XPUContextResource::GetRuntimeVersion() const { return runtime_version_; }

int XPUContextResource::GetXpuVersion() const { return xpu_version_; }

void XPUContextResource::ReBindStream(void* stream) {
owned_stream_ = false;
stream_ = stream;
}
// XPUContextResource End.

// Resource Manager
void* ResourceManager::InitXPUResource(const phi::Place& place, void* stream) {
std::lock_guard<std::mutex> lock_gurad(xpu_mutex_);
if (xpu_resources_.count(stream)) {
Increase(stream);
return stream;
} else {
std::unique_ptr<XPUContextResource> resource{
new XPUContextResource(place, stream)};
void* s = resource->GetStream();
ref_count_[s] = 1;
xpu_resources_.emplace(s, std::move(resource));
return s;
}
}

XPUContextResource* ResourceManager::GetXPUResource(void* stream) const {
PADDLE_ENFORCE_EQ(xpu_resources_.count(stream),
true,
platform::errors::InvalidArgument(
"The stream[%p] not found in xpu_resources.", stream));
return xpu_resources_.at(stream).get();
}

void ResourceManager::XpuResourceReBindStream(void* old_stream,
void* new_stream) {
PADDLE_ENFORCE_EQ(
xpu_resources_.count(old_stream),
true,
platform::errors::InvalidArgument(
"The stream[%p] not found in xpu_resources.", old_stream));
auto xpu_resource = std::move(xpu_resources_.at(old_stream));
DestroyXPUResource(old_stream);
PADDLE_ENFORCE_EQ(
ref_count_.count(old_stream),
0,
platform::errors::Fatal("xpu resources rebind stream failed."));

xpu_resource->ReBindStream(new_stream);
ref_count_[new_stream]++;
xpu_resources_.emplace(new_stream, std::move(xpu_resource));
}

void ResourceManager::DestroyXPUResource(void* stream) {
PADDLE_ENFORCE_EQ(xpu_resources_.count(stream),
true,
platform::errors::InvalidArgument(
"The stream[%p] not found in xpu_resources.", stream));
Decrease(stream);
}

void ResourceManager::Decrease(void* stream) {
PADDLE_ENFORCE_EQ(ref_count_.count(stream),
true,
platform::errors::InvalidArgument(
"The stream[%p] not found in ref_count.", stream));
--ref_count_[stream];
if (ref_count_[stream] == 0) {
ref_count_.erase(stream);
xpu_resources_.erase(stream);
}
}

void ResourceManager::Increase(void* stream) {
PADDLE_ENFORCE_EQ(ref_count_.count(stream),
true,
platform::errors::InvalidArgument(
"The stream[%p] not found in ref_count.", stream));
++ref_count_[stream];
}

int ResourceManager::RefCount(void* stream) const {
if (ref_count_.count(stream) == 0) return 0;
return ref_count_.at(stream);
}
// Resource Manager End.

#endif
} // namespace paddle
49 changes: 49 additions & 0 deletions paddle/fluid/inference/api/resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,33 @@ class GPUContextResource {
};
#endif

#if defined(PADDLE_WITH_XPU)
class XPUContextResource {
public:
explicit XPUContextResource(const phi::Place& place, void* stream);
~XPUContextResource();
phi::Place Place() const;
void* GetStream() const;
int GetDriverVersion() const;
int GetRuntimeVersion() const;
int GetXpuVersion() const;
void ReBindStream(void* stream);

private:
void InitXPUResource(void* stream);
void InitXpuProperties();

private:
bool owned_stream_{true};
void* stream_;
phi::Place place_;

int driver_version_;
int runtime_version_;
int xpu_version_;
}; // class XPUContextResource
#endif

class ResourceManager {
public:
ResourceManager() = default;
Expand Down Expand Up @@ -173,6 +200,28 @@ class ResourceManager {
gpu_resources_;
#endif

// XPU Resource
#if defined(PADDLE_WITH_XPU)

public:
void* InitXPUResource(const phi::Place& place, void* stream);
void DestroyXPUResource(void* stream);
XPUContextResource* GetXPUResource(void* stream) const;
int RefCount(void* stream) const;
void XpuResourceReBindStream(void* old_stream, void* new_stream);

private:
void Decrease(void* stream);
void Increase(void* stream);

private:
std::mutex xpu_mutex_;
// a stream corresponding to a series of resource.
std::map<void* /*stream*/, std::atomic<int>> ref_count_;
std::map<void* /*stream*/, std::unique_ptr<XPUContextResource>>
xpu_resources_;
#endif

private:
DISABLE_COPY_AND_ASSIGN(ResourceManager);
};
Expand Down
Loading