diff --git a/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h b/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h index e0898d3e47..af41672647 100644 --- a/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h +++ b/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h @@ -356,9 +356,8 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient std::unique_ptr background_thread_; std::chrono::milliseconds scheduled_delay_milliseconds_; - std::condition_variable background_thread_waiter_cv_; - std::mutex background_thread_waiter_lock_; std::chrono::milliseconds background_thread_wait_for_; + std::atomic is_shutdown; nostd::shared_ptr curl_global_initializer_; }; diff --git a/ext/src/http/client/curl/http_client_curl.cc b/ext/src/http/client/curl/http_client_curl.cc index e8d2b417cf..01e0578e38 100644 --- a/ext/src/http/client/curl/http_client_curl.cc +++ b/ext/src/http/client/curl/http_client_curl.cc @@ -191,13 +191,15 @@ HttpClient::HttpClient() scheduled_delay_milliseconds_{std::chrono::milliseconds(256)}, background_thread_wait_for_{std::chrono::minutes{1}}, curl_global_initializer_(HttpCurlGlobalInitializer::GetInstance()) -{} +{ + is_shutdown.store(false); +} HttpClient::~HttpClient() { + is_shutdown.store(true, std::memory_order_release); while (true) { - background_thread_wait_for_ = std::chrono::milliseconds{0}; std::unique_ptr background_thread; { std::lock_guard lock_guard{background_thread_m_}; @@ -213,7 +215,6 @@ HttpClient::~HttpClient() } if (background_thread->joinable()) { - background_thread_waiter_cv_.notify_all(); background_thread->join(); } } @@ -239,7 +240,6 @@ std::shared_ptr HttpClient::CreateSes std::lock_guard lock_guard{sessions_m_}; sessions_.insert({session_id, session}); - background_thread_waiter_cv_.notify_all(); // FIXME: Session may leak if it do not call SendRequest return session; } @@ -350,18 +350,28 @@ void HttpClient::MaybeSpawnBackgroundThread() background_thread_.reset(new std::thread( [](HttpClient *self) { int still_running = 1; + std::chrono::system_clock::time_point last_free_job_timepoint = + std::chrono::system_clock::now(); while (true) { CURLMsg *msg; int queued; CURLMcode mc = curl_multi_perform(self->multi_handle_, &still_running); + + std::chrono::system_clock::time_point now = std::chrono::system_clock::now(); + + auto wait_for = self->background_thread_wait_for_; + if (self->is_shutdown.load(std::memory_order_acquire)) + { + wait_for = std::chrono::milliseconds{0}; + } // According to https://curl.se/libcurl/c/curl_multi_perform.html, when mc is not OK, we // can not curl_multi_perform it again if (mc != CURLM_OK) { self->resetMultiHandle(); } - else if (still_running) + else if (still_running || now - last_free_job_timepoint < wait_for) { // curl_multi_poll is added from libcurl 7.66.0, before 7.68.0, we can only wait util // timeout to do the rest jobs @@ -422,22 +432,11 @@ void HttpClient::MaybeSpawnBackgroundThread() if (still_running > 0) { + last_free_job_timepoint = now; continue; } - // If there is no pending jobs, Exit flush thread if there is not data to flush more - // than one minute. - if (self->background_thread_wait_for_ != std::chrono::milliseconds{0}) - { - std::unique_lock lk{self->background_thread_waiter_lock_}; - if (self->background_thread_waiter_cv_.wait_for( - lk, self->background_thread_wait_for_) != std::cv_status::timeout) - { - continue; - } - } - - if (still_running == 0) + if (still_running == 0 && now - last_free_job_timepoint > wait_for) { std::lock_guard lock_guard{self->background_thread_m_}; // Double check, make sure no more pending sessions after locking background thread @@ -461,18 +460,16 @@ void HttpClient::MaybeSpawnBackgroundThread() still_running = 1; } - if (still_running > 0) - { - continue; - } - // If there is no pending jobs, we can stop the background thread. - if (self->background_thread_) + if (still_running == 0) { - self->background_thread_->detach(); - self->background_thread_.reset(); + if (self->background_thread_) + { + self->background_thread_->detach(); + self->background_thread_.reset(); + } + break; } - break; } } }, @@ -533,6 +530,7 @@ void HttpClient::SetBackgroundWaitFor(std::chrono::milliseconds ms) void HttpClient::WaitBackgroundThreadExit() { + is_shutdown.store(true, std::memory_order_release); std::unique_ptr background_thread; { std::lock_guard lock_guard{background_thread_m_}; @@ -541,12 +539,9 @@ void HttpClient::WaitBackgroundThreadExit() if (background_thread && background_thread->joinable()) { - auto wait_for = background_thread_wait_for_; - background_thread_wait_for_ = std::chrono::milliseconds{0}; - background_thread_waiter_cv_.notify_all(); background_thread->join(); - background_thread_wait_for_ = wait_for; } + is_shutdown.store(false, std::memory_order_release); } void HttpClient::wakeupBackgroundThread() @@ -560,7 +555,6 @@ void HttpClient::wakeupBackgroundThread() curl_multi_wakeup(multi_handle_); } #endif - background_thread_waiter_cv_.notify_all(); } bool HttpClient::doAddSessions()