Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
xiehuc committed Dec 10, 2024
1 parent a5b6817 commit 6260e42
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -356,9 +356,8 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient
std::unique_ptr<std::thread> background_thread_;
std::chrono::milliseconds scheduled_delay_milliseconds_;

std::condition_variable background_thread_waiter_cv_;
std::mutex background_thread_waiter_lock_;
std::chrono::milliseconds background_thread_wait_for_;
std::atomic<bool> is_shutdown;

nostd::shared_ptr<HttpCurlGlobalInitializer> curl_global_initializer_;
};
Expand Down
58 changes: 26 additions & 32 deletions ext/src/http/client/curl/http_client_curl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,15 @@ HttpClient::HttpClient()
scheduled_delay_milliseconds_{std::chrono::milliseconds(256)},
background_thread_wait_for_{std::chrono::minutes{1}},
curl_global_initializer_(HttpCurlGlobalInitializer::GetInstance())
{}
{
is_shutdown.store(false);
}

HttpClient::~HttpClient()
{
is_shutdown.store(true, std::memory_order_release);
while (true)
{
background_thread_wait_for_ = std::chrono::milliseconds{0};
std::unique_ptr<std::thread> background_thread;
{
std::lock_guard<std::mutex> lock_guard{background_thread_m_};
Expand All @@ -213,7 +215,6 @@ HttpClient::~HttpClient()
}
if (background_thread->joinable())
{
background_thread_waiter_cv_.notify_all();
background_thread->join();
}
}
Expand All @@ -239,7 +240,6 @@ std::shared_ptr<opentelemetry::ext::http::client::Session> HttpClient::CreateSes
std::lock_guard<std::mutex> lock_guard{sessions_m_};
sessions_.insert({session_id, session});

background_thread_waiter_cv_.notify_all();
// FIXME: Session may leak if it do not call SendRequest
return session;
}
Expand Down Expand Up @@ -350,18 +350,28 @@ void HttpClient::MaybeSpawnBackgroundThread()
background_thread_.reset(new std::thread(
[](HttpClient *self) {
int still_running = 1;
std::chrono::system_clock::time_point last_free_job_timepoint =
std::chrono::system_clock::now();
while (true)
{
CURLMsg *msg;
int queued;
CURLMcode mc = curl_multi_perform(self->multi_handle_, &still_running);

std::chrono::system_clock::time_point now = std::chrono::system_clock::now();

auto wait_for = self->background_thread_wait_for_;
if (self->is_shutdown.load(std::memory_order_acquire))
{
wait_for = std::chrono::milliseconds{0};
}
// According to https://curl.se/libcurl/c/curl_multi_perform.html, when mc is not OK, we
// can not curl_multi_perform it again
if (mc != CURLM_OK)
{
self->resetMultiHandle();
}
else if (still_running)
else if (still_running || now - last_free_job_timepoint < wait_for)
{
// curl_multi_poll is added from libcurl 7.66.0, before 7.68.0, we can only wait util
// timeout to do the rest jobs
Expand Down Expand Up @@ -422,22 +432,11 @@ void HttpClient::MaybeSpawnBackgroundThread()

if (still_running > 0)
{
last_free_job_timepoint = now;
continue;
}

// If there is no pending jobs, Exit flush thread if there is not data to flush more
// than one minute.
if (self->background_thread_wait_for_ != std::chrono::milliseconds{0})
{
std::unique_lock<std::mutex> lk{self->background_thread_waiter_lock_};
if (self->background_thread_waiter_cv_.wait_for(
lk, self->background_thread_wait_for_) != std::cv_status::timeout)
{
continue;
}
}

if (still_running == 0)
if (still_running == 0 && now - last_free_job_timepoint > wait_for)
{
std::lock_guard<std::mutex> lock_guard{self->background_thread_m_};
// Double check, make sure no more pending sessions after locking background thread
Expand All @@ -461,18 +460,16 @@ void HttpClient::MaybeSpawnBackgroundThread()
still_running = 1;
}

if (still_running > 0)
{
continue;
}

// If there is no pending jobs, we can stop the background thread.
if (self->background_thread_)
if (still_running == 0)
{
self->background_thread_->detach();
self->background_thread_.reset();
if (self->background_thread_)
{
self->background_thread_->detach();
self->background_thread_.reset();
}
break;
}
break;
}
}
},
Expand Down Expand Up @@ -533,6 +530,7 @@ void HttpClient::SetBackgroundWaitFor(std::chrono::milliseconds ms)

void HttpClient::WaitBackgroundThreadExit()
{
is_shutdown.store(true, std::memory_order_release);
std::unique_ptr<std::thread> background_thread;
{
std::lock_guard<std::mutex> lock_guard{background_thread_m_};
Expand All @@ -541,12 +539,9 @@ void HttpClient::WaitBackgroundThreadExit()

if (background_thread && background_thread->joinable())
{
auto wait_for = background_thread_wait_for_;
background_thread_wait_for_ = std::chrono::milliseconds{0};
background_thread_waiter_cv_.notify_all();
background_thread->join();
background_thread_wait_for_ = wait_for;
}
is_shutdown.store(false, std::memory_order_release);
}

void HttpClient::wakeupBackgroundThread()
Expand All @@ -560,7 +555,6 @@ void HttpClient::wakeupBackgroundThread()
curl_multi_wakeup(multi_handle_);
}
#endif
background_thread_waiter_cv_.notify_all();
}

bool HttpClient::doAddSessions()
Expand Down

0 comments on commit 6260e42

Please sign in to comment.