diff --git a/paddle/fluid/inference/api/analysis_predictor.cc b/paddle/fluid/inference/api/analysis_predictor.cc index 30734e38b3e8f..928153dab5e62 100644 --- a/paddle/fluid/inference/api/analysis_predictor.cc +++ b/paddle/fluid/inference/api/analysis_predictor.cc @@ -334,6 +334,26 @@ bool AnalysisPredictor::Init( InitDeviceContexts(); } } +#endif +#if defined(PADDLE_WITH_XPU) + if (config_.use_xpu_ && config_.use_external_stream_) { + private_context_ = true; + } + if (private_context_) { + if (!status_is_cloned_) { + predictor_stream_ = config_.GetExecStream(); + } + // NOTE: If the external_stream equals to global_device_contexts's stream, + // then fallback. + auto global_stream = + static_cast( + platform::DeviceContextPool::Instance().Get(place_)) + ->stream(); + if (predictor_stream_ != global_stream) { + InitResourceManager(predictor_stream_); + InitDeviceContexts(); + } + } #endif inference::DisplayMemoryInfo(place_, "Init predictor"); return true; @@ -418,6 +438,9 @@ void AnalysisPredictor::InitResourceManager(void *stream) { #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) predictor_stream_ = ResourceManager::Instance().InitGPUResource(place_, stream); +#elif defined(PADDLE_WITH_XPU) + predictor_stream_ = + ResourceManager::Instance().InitXPUResource(place_, stream); #endif } @@ -487,6 +510,32 @@ void AnalysisPredictor::InitDeviceContexts() { return std::unique_ptr(gpu_context); })); } +#endif +#if defined(PADDLE_WITH_XPU) + if (place_.GetType() == phi::AllocationType::XPU) { + device_contexts_.emplace( + place_, std::async(std::launch::deferred, [=] { + auto *xpu_resource = + ResourceManager::Instance().GetXPUResource(predictor_stream_); + auto &instance = memory::allocation::AllocatorFacade::Instance(); + auto *xpu_context = new InferXPUContext(place_); + xpu_context->SetAllocator(instance.GetAllocator(place_).get()); + xpu_context->SetGenerator( + phi::DefaultXPUGenerator(place_.GetDeviceId()).get()); + xpu_context->SetHostAllocator( + instance.GetAllocator(platform::CPUPlace()).get()); + xpu_context->SetHostGenerator(phi::DefaultCPUGenerator().get()); + xpu_context->SetZeroAllocator( + instance.GetZeroAllocator(place_).get()); + xpu_context->SetHostZeroAllocator( + instance.GetZeroAllocator(platform::CPUPlace()).get()); + xpu_context->SetStream(xpu_resource->GetStream()); + xpu_context->SetDriverVersion(xpu_resource->GetDriverVersion()); + xpu_context->SetRuntimeVersion(xpu_resource->GetRuntimeVersion()); + xpu_context->SetXpuVersion(xpu_resource->GetXpuVersion()); + return std::unique_ptr(xpu_context); + })); + } #endif // TODO(Inference): Support other backends. } @@ -506,10 +555,14 @@ void *AnalysisPredictor::GetExecStream() const { #endif #if defined(PADDLE_WITH_XPU) if (place_.GetType() == phi::AllocationType::XPU) { - paddle::platform::DeviceContextPool &pool = - paddle::platform::DeviceContextPool::Instance(); - return reinterpret_cast(pool.Get(place_)) - ->stream(); + if (private_context_) { + return predictor_stream_; + } else { + paddle::platform::DeviceContextPool &pool = + paddle::platform::DeviceContextPool::Instance(); + return reinterpret_cast(pool.Get(place_)) + ->stream(); + } } #endif // TODO(inference): Support other backends. @@ -2050,6 +2103,33 @@ bool AnalysisPredictor::ExpRunWithExternalStream(const gpuStream_t stream) { } #endif +bool AnalysisPredictor::ExpRunWithExternalStream(void *stream) { +#if defined(PADDLE_WITH_XPU) + if (!private_context_) { + PADDLE_THROW(platform::errors::Fatal( + "Please use config.SetExecStream to init resources, and then we " + "will bind resources to execution stream.")); + } + if (stream != predictor_stream_) { + paddle::platform::XPUStreamSync( + static_cast(predictor_stream_)); + ResourceManager::Instance().XpuResourceReBindStream(predictor_stream_, + stream); + predictor_stream_ = stream; + + auto *dev_ctxs = reinterpret_cast>> *>( + this->GetDeviceContexts()); + auto *dev_ctx = + static_cast(dev_ctxs->at(place_).get().get()); + dev_ctx->SetStream(stream); + } + return ZeroCopyRun(); +#endif + return false; +} + void AnalysisPredictor::CollectShapeRangeInfo() { // if use gpu, sync first. paddle::platform::DeviceContextPool &pool = @@ -2413,7 +2493,12 @@ AnalysisPredictor::~AnalysisPredictor() { if (predictor_stream_ != nullptr) { ResourceManager::Instance().DestroyGPUResource(predictor_stream_); } +#elif defined(PADDLE_WITH_XPU) + if (predictor_stream_ != nullptr) { + ResourceManager::Instance().DestroyXPUResource(predictor_stream_); + } #endif + if (place_.GetType() != phi::AllocationType::UNDEFINED) { memory::Release(place_); } @@ -2922,6 +3007,11 @@ bool InternalUtils::RunWithExternalStream(paddle_infer::Predictor *p, #endif return false; } +bool InternalUtils::RunWithExternalStream(paddle_infer::Predictor *p, + void *stream) { + auto pred = dynamic_cast(p->predictor_.get()); + return pred->ExpRunWithExternalStream(stream); +} void InternalUtils::UpdateConfigInterleaved(paddle_infer::Config *c, bool with_interleaved) { diff --git a/paddle/fluid/inference/api/analysis_predictor.h b/paddle/fluid/inference/api/analysis_predictor.h index 83207a8bfd654..185aadbc6abf9 100644 --- a/paddle/fluid/inference/api/analysis_predictor.h +++ b/paddle/fluid/inference/api/analysis_predictor.h @@ -225,6 +225,9 @@ class AnalysisPredictor : public PaddlePredictor { bool ExpRunWithExternalStream(const gpuStream_t stream); #endif + // Note: Can only be used under thread_local semantics. + bool ExpRunWithExternalStream(void *stream); + /// /// \brief Get the execution stream on devices with a concept of stream, /// otherwise returns nullptr. diff --git a/paddle/fluid/inference/api/infer_context.cc b/paddle/fluid/inference/api/infer_context.cc index 9a76528a48edb..b56adddfa4e78 100644 --- a/paddle/fluid/inference/api/infer_context.cc +++ b/paddle/fluid/inference/api/infer_context.cc @@ -22,4 +22,9 @@ InferGPUContext::InferGPUContext(const phi::Place& place) : phi::GPUContext(place, false) {} #endif +#if defined(PADDLE_WITH_XPU) +InferXPUContext::InferXPUContext(const phi::Place& place) + : phi::XPUContext(place) {} +#endif + } // namespace paddle diff --git a/paddle/fluid/inference/api/infer_context.h b/paddle/fluid/inference/api/infer_context.h index 5dbcf355a6e27..130fd8c8d4869 100644 --- a/paddle/fluid/inference/api/infer_context.h +++ b/paddle/fluid/inference/api/infer_context.h @@ -45,4 +45,15 @@ class InferGPUContext : public phi::GPUContext { using phi::GPUContext::SetRuntimeVersion; }; #endif + +#if defined(PADDLE_WITH_XPU) +class InferXPUContext : public phi::XPUContext { + public: + explicit InferXPUContext(const phi::Place& place); + using phi::XPUContext::SetDriverVersion; + using phi::XPUContext::SetRuntimeVersion; + using phi::XPUContext::SetStream; + using phi::XPUContext::SetXpuVersion; +}; +#endif } // namespace paddle diff --git a/paddle/fluid/inference/api/paddle_api.h b/paddle/fluid/inference/api/paddle_api.h index 3a51f91b3afc2..ee15468c9b81e 100644 --- a/paddle/fluid/inference/api/paddle_api.h +++ b/paddle/fluid/inference/api/paddle_api.h @@ -480,7 +480,8 @@ class PD_INFER_DECL InternalUtils { cudaStream_t stream); static bool RunWithExternalStream(paddle_infer::Predictor* pred, hipStream_t stream); - + static bool RunWithExternalStream(paddle_infer::Predictor* pred, + void* stream); static void UpdateConfigInterleaved(paddle_infer::Config* c, bool with_interleaved); diff --git a/paddle/fluid/inference/api/resource_manager.cc b/paddle/fluid/inference/api/resource_manager.cc index 49d33d6750abe..0f3612a5e976c 100644 --- a/paddle/fluid/inference/api/resource_manager.cc +++ b/paddle/fluid/inference/api/resource_manager.cc @@ -41,6 +41,9 @@ #include "paddle/phi/backends/dynload/cusparse.h" #endif // PADDLE_WITH_CUDA +#ifdef PADDLE_WITH_XPU +#include "paddle/phi/backends/xpu/xpu_info.h" +#endif namespace paddle { namespace internal { @@ -545,4 +548,121 @@ int ResourceManager::RefCount(void* stream) const { } #endif +#if defined(PADDLE_WITH_XPU) +// XPUContextResource +XPUContextResource::XPUContextResource(const phi::Place& place, void* stream) + : place_(place) { + InitXPUResource(stream); +} + +XPUContextResource::~XPUContextResource() {} + +void XPUContextResource::InitXPUResource(void* stream) { + phi::backends::xpu::XPUDeviceGuard guard(place_.device); + if (stream) { + owned_stream_ = false; + stream_ = stream; + } + InitXpuProperties(); +} + +void XPUContextResource::InitXpuProperties() { + phi::backends::xpu::XPUDeviceGuard guard(place_.device); + driver_version_ = phi::backends::xpu::GetDriverVersion(); + runtime_version_ = phi::backends::xpu::GetRuntimeVersion(); + xpu_version_ = + static_cast(phi::backends::xpu::get_xpu_version(place_.device)); +} +void* XPUContextResource::GetStream() const { return stream_; } + +int XPUContextResource::GetDriverVersion() const { return driver_version_; } + +int XPUContextResource::GetRuntimeVersion() const { return runtime_version_; } + +int XPUContextResource::GetXpuVersion() const { return xpu_version_; } + +void XPUContextResource::ReBindStream(void* stream) { + owned_stream_ = false; + stream_ = stream; +} +// XPUContextResource End. + +// Resource Manager +void* ResourceManager::InitXPUResource(const phi::Place& place, void* stream) { + std::lock_guard lock_gurad(xpu_mutex_); + if (xpu_resources_.count(stream)) { + Increase(stream); + return stream; + } else { + std::unique_ptr resource{ + new XPUContextResource(place, stream)}; + void* s = resource->GetStream(); + ref_count_[s] = 1; + xpu_resources_.emplace(s, std::move(resource)); + return s; + } +} + +XPUContextResource* ResourceManager::GetXPUResource(void* stream) const { + PADDLE_ENFORCE_EQ(xpu_resources_.count(stream), + true, + platform::errors::InvalidArgument( + "The stream[%p] not found in xpu_resources.", stream)); + return xpu_resources_.at(stream).get(); +} + +void ResourceManager::XpuResourceReBindStream(void* old_stream, + void* new_stream) { + PADDLE_ENFORCE_EQ( + xpu_resources_.count(old_stream), + true, + platform::errors::InvalidArgument( + "The stream[%p] not found in xpu_resources.", old_stream)); + auto xpu_resource = std::move(xpu_resources_.at(old_stream)); + DestroyXPUResource(old_stream); + PADDLE_ENFORCE_EQ( + ref_count_.count(old_stream), + 0, + platform::errors::Fatal("xpu resources rebind stream failed.")); + + xpu_resource->ReBindStream(new_stream); + ref_count_[new_stream]++; + xpu_resources_.emplace(new_stream, std::move(xpu_resource)); +} + +void ResourceManager::DestroyXPUResource(void* stream) { + PADDLE_ENFORCE_EQ(xpu_resources_.count(stream), + true, + platform::errors::InvalidArgument( + "The stream[%p] not found in xpu_resources.", stream)); + Decrease(stream); +} + +void ResourceManager::Decrease(void* stream) { + PADDLE_ENFORCE_EQ(ref_count_.count(stream), + true, + platform::errors::InvalidArgument( + "The stream[%p] not found in ref_count.", stream)); + --ref_count_[stream]; + if (ref_count_[stream] == 0) { + ref_count_.erase(stream); + xpu_resources_.erase(stream); + } +} + +void ResourceManager::Increase(void* stream) { + PADDLE_ENFORCE_EQ(ref_count_.count(stream), + true, + platform::errors::InvalidArgument( + "The stream[%p] not found in ref_count.", stream)); + ++ref_count_[stream]; +} + +int ResourceManager::RefCount(void* stream) const { + if (ref_count_.count(stream) == 0) return 0; + return ref_count_.at(stream); +} +// Resource Manager End. + +#endif } // namespace paddle diff --git a/paddle/fluid/inference/api/resource_manager.h b/paddle/fluid/inference/api/resource_manager.h index 6655324d305b6..119b5d94dc56c 100644 --- a/paddle/fluid/inference/api/resource_manager.h +++ b/paddle/fluid/inference/api/resource_manager.h @@ -134,6 +134,33 @@ class GPUContextResource { }; #endif +#if defined(PADDLE_WITH_XPU) +class XPUContextResource { + public: + explicit XPUContextResource(const phi::Place& place, void* stream); + ~XPUContextResource(); + phi::Place Place() const; + void* GetStream() const; + int GetDriverVersion() const; + int GetRuntimeVersion() const; + int GetXpuVersion() const; + void ReBindStream(void* stream); + + private: + void InitXPUResource(void* stream); + void InitXpuProperties(); + + private: + bool owned_stream_{true}; + void* stream_; + phi::Place place_; + + int driver_version_; + int runtime_version_; + int xpu_version_; +}; // class XPUContextResource +#endif + class ResourceManager { public: ResourceManager() = default; @@ -173,6 +200,28 @@ class ResourceManager { gpu_resources_; #endif +// XPU Resource +#if defined(PADDLE_WITH_XPU) + + public: + void* InitXPUResource(const phi::Place& place, void* stream); + void DestroyXPUResource(void* stream); + XPUContextResource* GetXPUResource(void* stream) const; + int RefCount(void* stream) const; + void XpuResourceReBindStream(void* old_stream, void* new_stream); + + private: + void Decrease(void* stream); + void Increase(void* stream); + + private: + std::mutex xpu_mutex_; + // a stream corresponding to a series of resource. + std::map> ref_count_; + std::map> + xpu_resources_; +#endif + private: DISABLE_COPY_AND_ASSIGN(ResourceManager); }; diff --git a/paddle/phi/backends/xpu/xpu_context.cc b/paddle/phi/backends/xpu/xpu_context.cc index b1d768f8445b8..4c0727088f662 100644 --- a/paddle/phi/backends/xpu/xpu_context.cc +++ b/paddle/phi/backends/xpu/xpu_context.cc @@ -76,7 +76,7 @@ struct XPUContext::Impl { if (owned_ && context_ != nullptr) { backends::xpu::XPUDeviceGuard guard(place_.GetDeviceId()); xpu_wait(context_->xpu_stream); - if (context_->xpu_stream) { + if (context_->xpu_stream && stream_owned_) { // manually destroy XPUStream here until xpu::api integrates this work // into Context dtor xpu_stream_destroy(context_->xpu_stream); @@ -111,6 +111,12 @@ struct XPUContext::Impl { return context_->xpu_stream; } + // Set external stream for context + void SetStream(void* stream) { + stream_owned_ = false; + context_->set_stream(static_cast(stream)); + } + xpu::Context* GetXContext() const { PD_CHECK(context_ != nullptr, "the xpu context is nullptr."); return context_; @@ -179,6 +185,7 @@ struct XPUContext::Impl { return; } PADDLE_ENFORCE_XPU_SUCCESS(xpu_stream_create(&context_->xpu_stream)); + stream_owned_ = true; } // Methods of XPU Dataloader threads contexts map, @@ -221,8 +228,11 @@ struct XPUContext::Impl { } bool owned_{false}; + bool stream_owned_{false}; Place place_; backends::xpu::XPUVersion xpu_version_; + int runtime_version_; + int driver_version_; xpu::Context* context_{nullptr}; std::unordered_map xdl_context_map_; @@ -246,6 +256,20 @@ const Place& XPUContext::GetPlace() const { return impl_->GetPlace(); } XPUStream XPUContext::stream() const { return impl_->stream(); } +void XPUContext::SetStream(void* stream) { impl_->SetStream(stream); } + +void XPUContext::SetXpuVersion(int version) { + impl_->xpu_version_ = static_cast(version); +} + +void XPUContext::SetRuntimeVersion(int version) { + impl_->runtime_version_ = version; +} + +void XPUContext::SetDriverVersion(int version) { + impl_->driver_version_ = version; +} + backends::xpu::XPUVersion XPUContext::xpu_version() const { return impl_->xpu_version_; } diff --git a/paddle/phi/backends/xpu/xpu_context.h b/paddle/phi/backends/xpu/xpu_context.h index 349118f33362b..fa4899d9871e4 100644 --- a/paddle/phi/backends/xpu/xpu_context.h +++ b/paddle/phi/backends/xpu/xpu_context.h @@ -56,6 +56,9 @@ class XPUContext : public DeviceContext, void SetBkclContext(xpu::BKCLContext_t context); void CreateStream(); + // For share external stream. + void SetStream(void* stream); + // Wait for all operations completion in the stream. void Wait() const override; @@ -73,6 +76,12 @@ class XPUContext : public DeviceContext, void SetL3Cache(int l3_size = 14155776); + void SetXpuVersion(int version); + + void SetRuntimeVersion(int runtime_version); + + void SetDriverVersion(int driver_version); + Eigen::DefaultDevice* eigen_device() const { return nullptr; } XPUStream stream() const; diff --git a/test/cpp/inference/api/analysis_predictor_tester.cc b/test/cpp/inference/api/analysis_predictor_tester.cc index 5692d7607bba3..e6b5630dccad6 100644 --- a/test/cpp/inference/api/analysis_predictor_tester.cc +++ b/test/cpp/inference/api/analysis_predictor_tester.cc @@ -17,6 +17,10 @@ #if defined(PADDLE_WITH_CUDA) #include #endif +#if defined(PADDLE_WITH_XPU) +#include "xpu/runtime.h" +#include "xpu/xdnn.h" +#endif #include #include @@ -654,6 +658,57 @@ TEST(Predictor, Streams) { } #endif +#if defined(PADDLE_WITH_XPU) +TEST(Predictor, XPUStreams) { + // external stream + { + auto context = baidu::xpu::api::create_context(); + xpu_stream_create(&context->xpu_stream); + + Config config; + config.SetModel(FLAGS_dirname); + config.EnableXpu(); + config.SetExecStream(static_cast(context->xpu_stream)); + CHECK_EQ(config.external_stream_enabled(), true); + + auto predictor = CreatePredictor(config); + auto stream = predictor->GetExecStream(); + CHECK_EQ(static_cast(context->xpu_stream), stream); + CHECK_NOTNULL(paddle::ResourceManager::Instance().GetXPUResource(stream)); + CHECK_EQ(paddle::ResourceManager::Instance().RefCount(stream), 1); + } + + // 2 predictor on 2 stream + { + auto context1 = baidu::xpu::api::create_context(); + xpu_stream_create(&context1->xpu_stream); + + Config config; + config.SetModel(FLAGS_dirname); + config.EnableXpu(); + config.SetExecStream(static_cast(context1->xpu_stream)); + auto predictor = CreatePredictor(config); + auto stream1 = predictor->GetExecStream(); + CHECK_NOTNULL(paddle::ResourceManager::Instance().GetXPUResource(stream1)); + CHECK_EQ(paddle::ResourceManager::Instance().RefCount(stream1), 1); + + auto context2 = baidu::xpu::api::create_context(); + xpu_stream_create(&context2->xpu_stream); + + Config config2; + config2.SetModel(FLAGS_dirname); + config2.EnableXpu(); + config2.SetExecStream(static_cast(context2->xpu_stream)); + auto predictor2 = CreatePredictor(config2); + auto stream2 = predictor2->GetExecStream(); + CHECK_NOTNULL(paddle::ResourceManager::Instance().GetXPUResource(stream2)); + CHECK_EQ(paddle::ResourceManager::Instance().RefCount(stream2), 1); + + CHECK_NE(stream1, stream2); + } +} +#endif + TEST(AnalysisPredictor, OutputHookFunc) { auto hookfunc = [](const std::string& type, const std::string& var_name,