Skip to content

Commit

Permalink
XPU Support external stream (#53334)
Browse files Browse the repository at this point in the history
  • Loading branch information
csy0225 committed May 6, 2023
1 parent eda8df7 commit 99399f3
Show file tree
Hide file tree
Showing 10 changed files with 373 additions and 6 deletions.
98 changes: 94 additions & 4 deletions paddle/fluid/inference/api/analysis_predictor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,26 @@ bool AnalysisPredictor::Init(
InitDeviceContexts();
}
}
#endif
#if defined(PADDLE_WITH_XPU)
if (config_.use_xpu_ && config_.use_external_stream_) {
private_context_ = true;
}
if (private_context_) {
if (!status_is_cloned_) {
predictor_stream_ = config_.GetExecStream();
}
// NOTE: If the external_stream equals to global_device_contexts's stream,
// then fallback.
auto global_stream =
static_cast<phi::XPUContext *>(
platform::DeviceContextPool::Instance().Get(place_))
->stream();
if (predictor_stream_ != global_stream) {
InitResourceManager(predictor_stream_);
InitDeviceContexts();
}
}
#endif
inference::DisplayMemoryInfo(place_, "Init predictor");
return true;
Expand Down Expand Up @@ -418,6 +438,9 @@ void AnalysisPredictor::InitResourceManager(void *stream) {
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
predictor_stream_ =
ResourceManager::Instance().InitGPUResource(place_, stream);
#elif defined(PADDLE_WITH_XPU)
predictor_stream_ =
ResourceManager::Instance().InitXPUResource(place_, stream);
#endif
}

Expand Down Expand Up @@ -487,6 +510,32 @@ void AnalysisPredictor::InitDeviceContexts() {
return std::unique_ptr<phi::DeviceContext>(gpu_context);
}));
}
#endif
#if defined(PADDLE_WITH_XPU)
if (place_.GetType() == phi::AllocationType::XPU) {
device_contexts_.emplace(
place_, std::async(std::launch::deferred, [=] {
auto *xpu_resource =
ResourceManager::Instance().GetXPUResource(predictor_stream_);
auto &instance = memory::allocation::AllocatorFacade::Instance();
auto *xpu_context = new InferXPUContext(place_);
xpu_context->SetAllocator(instance.GetAllocator(place_).get());
xpu_context->SetGenerator(
phi::DefaultXPUGenerator(place_.GetDeviceId()).get());
xpu_context->SetHostAllocator(
instance.GetAllocator(platform::CPUPlace()).get());
xpu_context->SetHostGenerator(phi::DefaultCPUGenerator().get());
xpu_context->SetZeroAllocator(
instance.GetZeroAllocator(place_).get());
xpu_context->SetHostZeroAllocator(
instance.GetZeroAllocator(platform::CPUPlace()).get());
xpu_context->SetStream(xpu_resource->GetStream());
xpu_context->SetDriverVersion(xpu_resource->GetDriverVersion());
xpu_context->SetRuntimeVersion(xpu_resource->GetRuntimeVersion());
xpu_context->SetXpuVersion(xpu_resource->GetXpuVersion());
return std::unique_ptr<phi::DeviceContext>(xpu_context);
}));
}
#endif
// TODO(Inference): Support other backends.
}
Expand All @@ -506,10 +555,14 @@ void *AnalysisPredictor::GetExecStream() const {
#endif
#if defined(PADDLE_WITH_XPU)
if (place_.GetType() == phi::AllocationType::XPU) {
paddle::platform::DeviceContextPool &pool =
paddle::platform::DeviceContextPool::Instance();
return reinterpret_cast<const phi::XPUContext *>(pool.Get(place_))
->stream();
if (private_context_) {
return predictor_stream_;
} else {
paddle::platform::DeviceContextPool &pool =
paddle::platform::DeviceContextPool::Instance();
return reinterpret_cast<const phi::XPUContext *>(pool.Get(place_))
->stream();
}
}
#endif
// TODO(inference): Support other backends.
Expand Down Expand Up @@ -2050,6 +2103,33 @@ bool AnalysisPredictor::ExpRunWithExternalStream(const gpuStream_t stream) {
}
#endif

bool AnalysisPredictor::ExpRunWithExternalStream(void *stream) {
#if defined(PADDLE_WITH_XPU)
if (!private_context_) {
PADDLE_THROW(platform::errors::Fatal(
"Please use config.SetExecStream to init resources, and then we "
"will bind resources to execution stream."));
}
if (stream != predictor_stream_) {
paddle::platform::XPUStreamSync(
static_cast<paddle::xpuStream>(predictor_stream_));
ResourceManager::Instance().XpuResourceReBindStream(predictor_stream_,
stream);
predictor_stream_ = stream;

auto *dev_ctxs = reinterpret_cast<const std::map<
phi::Place,
std::shared_future<std::unique_ptr<phi::DeviceContext>>> *>(
this->GetDeviceContexts());
auto *dev_ctx =
static_cast<InferXPUContext *>(dev_ctxs->at(place_).get().get());
dev_ctx->SetStream(stream);
}
return ZeroCopyRun();
#endif
return false;
}

void AnalysisPredictor::CollectShapeRangeInfo() {
// if use gpu, sync first.
paddle::platform::DeviceContextPool &pool =
Expand Down Expand Up @@ -2413,7 +2493,12 @@ AnalysisPredictor::~AnalysisPredictor() {
if (predictor_stream_ != nullptr) {
ResourceManager::Instance().DestroyGPUResource(predictor_stream_);
}
#elif defined(PADDLE_WITH_XPU)
if (predictor_stream_ != nullptr) {
ResourceManager::Instance().DestroyXPUResource(predictor_stream_);
}
#endif

if (place_.GetType() != phi::AllocationType::UNDEFINED) {
memory::Release(place_);
}
Expand Down Expand Up @@ -2922,6 +3007,11 @@ bool InternalUtils::RunWithExternalStream(paddle_infer::Predictor *p,
#endif
return false;
}
bool InternalUtils::RunWithExternalStream(paddle_infer::Predictor *p,
void *stream) {
auto pred = dynamic_cast<paddle::AnalysisPredictor *>(p->predictor_.get());
return pred->ExpRunWithExternalStream(stream);
}

void InternalUtils::UpdateConfigInterleaved(paddle_infer::Config *c,
bool with_interleaved) {
Expand Down
3 changes: 3 additions & 0 deletions paddle/fluid/inference/api/analysis_predictor.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ class AnalysisPredictor : public PaddlePredictor {
bool ExpRunWithExternalStream(const gpuStream_t stream);
#endif

// Note: Can only be used under thread_local semantics.
bool ExpRunWithExternalStream(void *stream);

///
/// \brief Get the execution stream on devices with a concept of stream,
/// otherwise returns nullptr.
Expand Down
5 changes: 5 additions & 0 deletions paddle/fluid/inference/api/infer_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,9 @@ InferGPUContext::InferGPUContext(const phi::Place& place)
: phi::GPUContext(place, false) {}
#endif

#if defined(PADDLE_WITH_XPU)
InferXPUContext::InferXPUContext(const phi::Place& place)
: phi::XPUContext(place) {}
#endif

} // namespace paddle
11 changes: 11 additions & 0 deletions paddle/fluid/inference/api/infer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,15 @@ class InferGPUContext : public phi::GPUContext {
using phi::GPUContext::SetRuntimeVersion;
};
#endif

#if defined(PADDLE_WITH_XPU)
class InferXPUContext : public phi::XPUContext {
public:
explicit InferXPUContext(const phi::Place& place);
using phi::XPUContext::SetDriverVersion;
using phi::XPUContext::SetRuntimeVersion;
using phi::XPUContext::SetStream;
using phi::XPUContext::SetXpuVersion;
};
#endif
} // namespace paddle
3 changes: 2 additions & 1 deletion paddle/fluid/inference/api/paddle_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,8 @@ class PD_INFER_DECL InternalUtils {
cudaStream_t stream);
static bool RunWithExternalStream(paddle_infer::Predictor* pred,
hipStream_t stream);

static bool RunWithExternalStream(paddle_infer::Predictor* pred,
void* stream);
static void UpdateConfigInterleaved(paddle_infer::Config* c,
bool with_interleaved);

Expand Down
120 changes: 120 additions & 0 deletions paddle/fluid/inference/api/resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
#include "paddle/phi/backends/dynload/cusparse.h"
#endif // PADDLE_WITH_CUDA

#ifdef PADDLE_WITH_XPU
#include "paddle/phi/backends/xpu/xpu_info.h"
#endif
namespace paddle {
namespace internal {

Expand Down Expand Up @@ -545,4 +548,121 @@ int ResourceManager::RefCount(void* stream) const {
}
#endif

#if defined(PADDLE_WITH_XPU)
// XPUContextResource
XPUContextResource::XPUContextResource(const phi::Place& place, void* stream)
: place_(place) {
InitXPUResource(stream);
}

XPUContextResource::~XPUContextResource() {}

void XPUContextResource::InitXPUResource(void* stream) {
phi::backends::xpu::XPUDeviceGuard guard(place_.device);
if (stream) {
owned_stream_ = false;
stream_ = stream;
}
InitXpuProperties();
}

void XPUContextResource::InitXpuProperties() {
phi::backends::xpu::XPUDeviceGuard guard(place_.device);
driver_version_ = phi::backends::xpu::GetDriverVersion();
runtime_version_ = phi::backends::xpu::GetRuntimeVersion();
xpu_version_ =
static_cast<int>(phi::backends::xpu::get_xpu_version(place_.device));
}
void* XPUContextResource::GetStream() const { return stream_; }

int XPUContextResource::GetDriverVersion() const { return driver_version_; }

int XPUContextResource::GetRuntimeVersion() const { return runtime_version_; }

int XPUContextResource::GetXpuVersion() const { return xpu_version_; }

void XPUContextResource::ReBindStream(void* stream) {
owned_stream_ = false;
stream_ = stream;
}
// XPUContextResource End.

// Resource Manager
void* ResourceManager::InitXPUResource(const phi::Place& place, void* stream) {
std::lock_guard<std::mutex> lock_gurad(xpu_mutex_);
if (xpu_resources_.count(stream)) {
Increase(stream);
return stream;
} else {
std::unique_ptr<XPUContextResource> resource{
new XPUContextResource(place, stream)};
void* s = resource->GetStream();
ref_count_[s] = 1;
xpu_resources_.emplace(s, std::move(resource));
return s;
}
}

XPUContextResource* ResourceManager::GetXPUResource(void* stream) const {
PADDLE_ENFORCE_EQ(xpu_resources_.count(stream),
true,
platform::errors::InvalidArgument(
"The stream[%p] not found in xpu_resources.", stream));
return xpu_resources_.at(stream).get();
}

void ResourceManager::XpuResourceReBindStream(void* old_stream,
void* new_stream) {
PADDLE_ENFORCE_EQ(
xpu_resources_.count(old_stream),
true,
platform::errors::InvalidArgument(
"The stream[%p] not found in xpu_resources.", old_stream));
auto xpu_resource = std::move(xpu_resources_.at(old_stream));
DestroyXPUResource(old_stream);
PADDLE_ENFORCE_EQ(
ref_count_.count(old_stream),
0,
platform::errors::Fatal("xpu resources rebind stream failed."));

xpu_resource->ReBindStream(new_stream);
ref_count_[new_stream]++;
xpu_resources_.emplace(new_stream, std::move(xpu_resource));
}

void ResourceManager::DestroyXPUResource(void* stream) {
PADDLE_ENFORCE_EQ(xpu_resources_.count(stream),
true,
platform::errors::InvalidArgument(
"The stream[%p] not found in xpu_resources.", stream));
Decrease(stream);
}

void ResourceManager::Decrease(void* stream) {
PADDLE_ENFORCE_EQ(ref_count_.count(stream),
true,
platform::errors::InvalidArgument(
"The stream[%p] not found in ref_count.", stream));
--ref_count_[stream];
if (ref_count_[stream] == 0) {
ref_count_.erase(stream);
xpu_resources_.erase(stream);
}
}

void ResourceManager::Increase(void* stream) {
PADDLE_ENFORCE_EQ(ref_count_.count(stream),
true,
platform::errors::InvalidArgument(
"The stream[%p] not found in ref_count.", stream));
++ref_count_[stream];
}

int ResourceManager::RefCount(void* stream) const {
if (ref_count_.count(stream) == 0) return 0;
return ref_count_.at(stream);
}
// Resource Manager End.

#endif
} // namespace paddle
49 changes: 49 additions & 0 deletions paddle/fluid/inference/api/resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,33 @@ class GPUContextResource {
};
#endif

#if defined(PADDLE_WITH_XPU)
class XPUContextResource {
public:
explicit XPUContextResource(const phi::Place& place, void* stream);
~XPUContextResource();
phi::Place Place() const;
void* GetStream() const;
int GetDriverVersion() const;
int GetRuntimeVersion() const;
int GetXpuVersion() const;
void ReBindStream(void* stream);

private:
void InitXPUResource(void* stream);
void InitXpuProperties();

private:
bool owned_stream_{true};
void* stream_;
phi::Place place_;

int driver_version_;
int runtime_version_;
int xpu_version_;
}; // class XPUContextResource
#endif

class ResourceManager {
public:
ResourceManager() = default;
Expand Down Expand Up @@ -173,6 +200,28 @@ class ResourceManager {
gpu_resources_;
#endif

// XPU Resource
#if defined(PADDLE_WITH_XPU)

public:
void* InitXPUResource(const phi::Place& place, void* stream);
void DestroyXPUResource(void* stream);
XPUContextResource* GetXPUResource(void* stream) const;
int RefCount(void* stream) const;
void XpuResourceReBindStream(void* old_stream, void* new_stream);

private:
void Decrease(void* stream);
void Increase(void* stream);

private:
std::mutex xpu_mutex_;
// a stream corresponding to a series of resource.
std::map<void* /*stream*/, std::atomic<int>> ref_count_;
std::map<void* /*stream*/, std::unique_ptr<XPUContextResource>>
xpu_resources_;
#endif

private:
DISABLE_COPY_AND_ASSIGN(ResourceManager);
};
Expand Down
Loading

0 comments on commit 99399f3

Please sign in to comment.