Skip to content

Commit

Permalink
Add unit test to call FinishSession() in async callback of HttpClient
Browse files Browse the repository at this point in the history
Signed-off-by: owent <admin@owent.net>
  • Loading branch information
owent committed Apr 28, 2022
1 parent 090780a commit 83fe42b
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ class Response : public opentelemetry::ext::http::client::Response

class HttpClient;

class Session : public opentelemetry::ext::http::client::Session
class Session : public opentelemetry::ext::http::client::Session,
public std::enable_shared_from_this<Session>
{
public:
Session(HttpClient &http_client,
Expand Down
14 changes: 8 additions & 6 deletions ext/src/http/client/curl/http_client_curl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -291,15 +291,17 @@ void HttpClient::MaybeSpawnBackgroundThread()

if (msg->msg == CURLMSG_DONE)
{
CURL *easy_handle = msg->easy_handle;
CURLcode result = msg->data.result;
HttpOperation *operation = nullptr;
curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &operation);
CURL *easy_handle = msg->easy_handle;
CURLcode result = msg->data.result;
Session *session = nullptr;
curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &session);
// If it's already moved into pending_to_remove_session_handles_, we just ingore this
// message.
if (nullptr != operation)
if (nullptr != session && session->GetOperation())
{
operation->PerformCurlMessage(result);
// Session can not be destroyed when calling PerformCurlMessage
auto hold_session = session->shared_from_this();
session->GetOperation()->PerformCurlMessage(result);
}
}
} while (true);
Expand Down
7 changes: 3 additions & 4 deletions ext/src/http/client/curl/http_operation_curl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,9 @@ void HttpOperation::Cleanup()
callback.swap(async_data_->callback);
if (callback)
{
async_data_->callback_thread == std::this_thread::get_id();
async_data_->callback_thread = std::this_thread::get_id();
callback(*this);
async_data_->callback_thread == std::thread::id();
async_data_->callback_thread = std::thread::id();
}

// Set value to promise to continue Finish()
Expand Down Expand Up @@ -403,8 +403,6 @@ CURLcode HttpOperation::Setup()
return CURLE_FAILED_INIT;
}

curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_PRIVATE, this);

curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_VERBOSE, 0);

// Specify target URL
Expand Down Expand Up @@ -557,6 +555,7 @@ CURLcode HttpOperation::SendAsync(Session *session, std::function<void(HttpOpera
{
return code;
}
curl_easy_setopt(curl_resource_.easy_handle, CURLOPT_PRIVATE, session);

DispatchEvent(opentelemetry::ext::http::client::SessionState::Connecting);
is_finished_.store(false, std::memory_order_release);
Expand Down
69 changes: 66 additions & 3 deletions ext/test/http/curl_http_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,29 @@ class PostEventHandler : public CustomEventHandler
}
};

class FinishInCallbackHandler : public CustomEventHandler
{
public:
FinishInCallbackHandler(std::shared_ptr<http_client::Session> session) : session_(session) {}

void OnResponse(http_client::Response &response) noexcept override
{
ASSERT_EQ(200, response.GetStatusCode());
ASSERT_EQ(response.GetBody().size(), 0);
is_called_ = true;
got_response_ = true;

if (session_)
{
session_->FinishSession();
session_.reset();
}
}

private:
std::shared_ptr<http_client::Session> session_;
};

class BasicCurlHttpTests : public ::testing::Test, public HTTP_SERVER_NS::HttpRequestCallback
{
protected:
Expand Down Expand Up @@ -125,7 +148,7 @@ class BasicCurlHttpTests : public ::testing::Test, public HTTP_SERVER_NS::HttpRe
response.headers["Content-Type"] = "text/plain";
response_status = 200;
}
if (request.uri == "/post/")
else if (request.uri == "/post/")
{
std::unique_lock<std::mutex> lk(mtx_requests);
received_requests_.push_back(request);
Expand All @@ -142,8 +165,10 @@ class BasicCurlHttpTests : public ::testing::Test, public HTTP_SERVER_NS::HttpRe
bool waitForRequests(unsigned timeOutSec, unsigned expected_count = 1)
{
std::unique_lock<std::mutex> lk(mtx_requests);
if (cv_got_events.wait_for(lk, std::chrono::milliseconds(1000 * timeOutSec),
[&] { return received_requests_.size() >= expected_count; }))
if (cv_got_events.wait_for(lk, std::chrono::milliseconds(1000 * timeOutSec), [&] {
//
return received_requests_.size() >= expected_count;
}))
{
return true;
}
Expand Down Expand Up @@ -452,3 +477,41 @@ TEST_F(BasicCurlHttpTests, SendPostRequestAsync)
http_client.WaitBackgroundThreadExit();
}
}

TEST_F(BasicCurlHttpTests, FinishInAsyncCallback)
{
curl::HttpClient http_client;

for (int round = 0; round < 2; ++round)
{
received_requests_.clear();
static constexpr const unsigned batch_count = 5;
std::shared_ptr<http_client::Session> sessions[batch_count];
std::shared_ptr<FinishInCallbackHandler> handlers[batch_count];
for (unsigned i = 0; i < batch_count; ++i)
{
sessions[i] = http_client.CreateSession("http://127.0.0.1:19000/get/");
auto request = sessions[i]->CreateRequest();
request->SetMethod(http_client::Method::Get);
request->SetUri("get/");

handlers[i] = std::make_shared<FinishInCallbackHandler>(sessions[i]);

// Lock mtx_requests to prevent response, we will check IsSessionActive() in the end
std::unique_lock<std::mutex> lock_requests(mtx_requests);
sessions[i]->SendRequest(handlers[i]);
ASSERT_TRUE(sessions[i]->IsSessionActive());
}

http_client.WaitBackgroundThreadExit();
ASSERT_TRUE(waitForRequests(300, batch_count));

for (unsigned i = 0; i < batch_count; ++i)
{
ASSERT_FALSE(sessions[i]->IsSessionActive());

ASSERT_TRUE(handlers[i]->is_called_);
ASSERT_TRUE(handlers[i]->got_response_);
}
}
}

0 comments on commit 83fe42b

Please sign in to comment.