Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Profiler] Fix possible crash when Agent does not answer namedpipe connection #5437

Merged
merged 3 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ void EtwEventsHandler::OnConnect(HANDLE hPipe)
readSize = 0;
if (!ReadEvents(hPipe, buffer.get(), bufferSize, readSize))
{
_logger->Warn("Stop reading events");
_logger->Info("Stop reading events");
break;
}

Expand Down Expand Up @@ -155,7 +155,7 @@ bool EtwEventsHandler::ReadEvents(HANDLE hPipe, uint8_t* pBuffer, DWORD bufferSi
{
if (lastError == ERROR_BROKEN_PIPE)
{
_logger->Warn("Disconnected named pipe client...");
_logger->Info("Disconnected named pipe client...");
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "IpcClient.h" // TODO: return codes should be defined in another shared header file
#include "IpcServer.h"
#include "..\SecurityDescriptorHelpers.h"

#include <iostream>
#include <sstream>
#include <memory>
Expand All @@ -13,14 +14,58 @@ IpcServer::IpcServer()
{
_showMessages = false;
_pHandler = nullptr;
_serverCount = 0;
_stopRequested.store(false);
_pLogger = nullptr;

_hInitializedEvent = ::CreateEvent(nullptr, true, false, nullptr);
}

IpcServer::~IpcServer()
{
Stop();

::CloseHandle(_hInitializedEvent);
_hInitializedEvent = nullptr;
}

void IpcServer::Stop()
{
_stopRequested.store(true);

// we also need to close the handle to the named pipe the server is listing to in order to unblock the ConnectNamedPipe() call
// and allow the server to really stop
if (_hNamedPipe != nullptr)
{
// connecting to the server pipe will unblock the ConnectNamedPipe() call
HANDLE hPipe = ::CreateFileA(
_portName.c_str(),
GENERIC_READ | GENERIC_WRITE,
0,
nullptr,
OPEN_EXISTING,
0,
nullptr);
if (hPipe != INVALID_HANDLE_VALUE)
{
::CloseHandle(hPipe);
}

// cleanup the server pipe
::DisconnectNamedPipe(_hNamedPipe);
::CloseHandle(_hNamedPipe);
_hNamedPipe = nullptr;
}
}

void IpcServer::WaitForNamedPipe(DWORD timeoutMS)
{
if (_hInitializedEvent == nullptr)
{
return;
}

// returns 0 if the event is signaled and WAIT_TIMEOUT if the timeout is reached (and the event is not signaled)
::WaitForSingleObject(_hInitializedEvent, timeoutMS);
}

IpcServer::IpcServer(IIpcLogger* pLogger,
Expand All @@ -37,8 +82,12 @@ IpcServer::IpcServer(IIpcLogger* pLogger,
_maxInstances = maxInstances;
_timeoutMS = timeoutMS;
_pHandler = pHandler;
_serverCount = 0;
_pLogger = pLogger;
_hNamedPipe = nullptr;
_showMessages = false;

// will be set when the named pipe is initialized
_hInitializedEvent = ::CreateEvent(nullptr, true, false, nullptr);
}

std::unique_ptr<IpcServer> IpcServer::StartAsync(
Expand All @@ -60,20 +109,22 @@ std::unique_ptr<IpcServer> IpcServer::StartAsync(
pLogger, portName, pHandler, inBufferSize, outBufferSize, maxInstances, timeoutMS
);

// let a threadpool thread process the command; allowing the server to process more incoming commands
// let a threadpool thread process the command because there is a blocking call to ConnectNamedPipe()
if (!::TrySubmitThreadpoolCallback(StartCallback, (PVOID)server.get(), nullptr))
{
server->ShowLastError("Impossible to add the Start callback into the threadpool...");
return nullptr;
}

// wait until the server has created the named pipe so the Agent will be able to connect
server->WaitForNamedPipe(200);

// wait a bit more for the blocking ConnectNamedPipe call is made to ensure that the Agent will be able to connect
::Sleep(100);

return server;
}

void IpcServer::Stop()
{
_stopRequested.store(true);
}

void CALLBACK IpcServer::StartCallback(PTP_CALLBACK_INSTANCE instance, PVOID context)
{
Expand All @@ -91,81 +142,71 @@ void CALLBACK IpcServer::StartCallback(PTP_CALLBACK_INSTANCE instance, PVOID con
return;
}

while (!pThis->_stopRequested.load())
pThis->_hNamedPipe =
::CreateNamedPipeA(
pThis->_portName.c_str(),
PIPE_ACCESS_DUPLEX,
PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT | PIPE_REJECT_REMOTE_CLIENTS,
pThis->_maxInstances,
pThis->_outBufferSize,
pThis->_inBufferSize,
pThis->_timeoutMS,
emptySA.get()
);

if (pThis->_hNamedPipe == INVALID_HANDLE_VALUE)
{
HANDLE hNamedPipe =
::CreateNamedPipeA(
pThis->_portName.c_str(),
PIPE_ACCESS_DUPLEX,
PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT | PIPE_REJECT_REMOTE_CLIENTS,
pThis->_maxInstances,
pThis->_outBufferSize,
pThis->_inBufferSize,
pThis->_timeoutMS,
emptySA.get()
);

pThis->_serverCount++;

if (hNamedPipe == INVALID_HANDLE_VALUE)
{
pThis->ShowLastError("Failed to create named pipe...");
if (pThis->_pLogger != nullptr)
{
std::stringstream builder;
builder << "--> for server #" << pThis->_serverCount << "...";
pThis->_pLogger->Error(builder.str());
}

pThis->_pHandler->OnStartError();
return;
}

if (pThis->_pHandler != nullptr)
pThis->ShowLastError("Failed to create named pipe...");
if (pThis->_pLogger != nullptr)
{
std::stringstream builder;
builder << "Listening to server #" << pThis->_serverCount << "...";
pThis->_pLogger->Info(builder.str());
builder << "--> for server...";
pThis->_pLogger->Error(builder.str());
}

if (!::ConnectNamedPipe(hNamedPipe, nullptr) && ::GetLastError() != ERROR_PIPE_CONNECTED)
{
pThis->ShowLastError("ConnectNamedPipe failed...");
::CloseHandle(hNamedPipe);
pThis->_pHandler->OnStartError();
::SetEvent(pThis->_hInitializedEvent);
return;
}

pThis->_pHandler->OnConnectError();
return;
}
std::stringstream builder;
builder << "Listening to named pipe '" << pThis->_portName << "'...";
pThis->_pLogger->Info(builder.str());

auto pServerInfo = new ServerInfo();
pServerInfo->pThis = pThis;
pServerInfo->hPipe = hNamedPipe;
// the Agent can connect to the named pipe
::SetEvent(pThis->_hInitializedEvent);

// let a threadpool thread process the read/write communication; allowing the server to process more incoming connections
if (!::TrySubmitThreadpoolCallback(ConnectCallback, pServerInfo, nullptr))
{
delete pServerInfo;
// this is a blocking call waiting for the Agent to connect
// if the agent is not running, it is going to block until the pipe is closed
// --> the ETW manager should detect the agent is not running and close the pipe
if (!::ConnectNamedPipe(pThis->_hNamedPipe, nullptr) && ::GetLastError() != ERROR_PIPE_CONNECTED)
{
pThis->ShowLastError("ConnectNamedPipe failed...");
pThis->_pHandler->OnConnectError();

pThis->ShowLastError("Impossible to add the Connect callback into the threadpool...");
pThis->_pHandler->OnStartError();
return;
}
::CloseHandle(pThis->_hNamedPipe);
pThis->_hNamedPipe = nullptr;

return;
}
}

void CALLBACK IpcServer::ConnectCallback(PTP_CALLBACK_INSTANCE instance, PVOID context)
{
ServerInfo* pInfo = reinterpret_cast<ServerInfo*>(context);
IpcServer* pThis = pInfo->pThis;
HANDLE hPipe = pInfo->hPipe;
delete pInfo;
// it is possible that an error occured when trying to connect to the Agent
// in that case, the server should stop
if (pThis->_stopRequested.load())
{
::CloseHandle(pThis->_hNamedPipe);
pThis->_hNamedPipe = nullptr;
pThis->_pHandler->OnConnectError();
return;
}

// this is a blocking call until the communication ends on this named pipe
pThis->_pHandler->OnConnect(hPipe);
pThis->_pHandler->OnConnect(pThis->_hNamedPipe);

// cleanup
::DisconnectNamedPipe(hPipe);
::CloseHandle(hPipe);
::DisconnectNamedPipe(pThis->_hNamedPipe);
::CloseHandle(pThis->_hNamedPipe);
pThis->_hNamedPipe = nullptr;
}

void IpcServer::ShowLastError(const char* message, uint32_t lastError)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class IpcServer
uint32_t timeoutMS);
~IpcServer();

// TODO: remove this method and use Start instead in the sample application
static std::unique_ptr<IpcServer> StartAsync(
IIpcLogger* pLogger,
const std::string& portName,
Expand All @@ -34,11 +35,11 @@ class IpcServer
uint32_t outBufferSize,
uint32_t maxInstances = PIPE_UNLIMITED_INSTANCES,
uint32_t timeoutMS = NMPWAIT_USE_DEFAULT_WAIT);
void WaitForNamedPipe(DWORD timeoutMS);
void Stop();

private:
static void CALLBACK StartCallback(PTP_CALLBACK_INSTANCE instance, PVOID context);
static void CALLBACK ConnectCallback(PTP_CALLBACK_INSTANCE instance, PVOID context);
void ShowLastError(const char* message, uint32_t lastError = ::GetLastError());

private:
Expand All @@ -50,8 +51,11 @@ class IpcServer
uint32_t _timeoutMS;
INamedPipeHandler* _pHandler;
IIpcLogger* _pLogger;
HANDLE _hNamedPipe;

// will be set when the server is initialized
HANDLE _hInitializedEvent;

uint32_t _serverCount;
std::atomic<bool> _stopRequested = false;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,15 +271,14 @@ bool EtwEventsManager::Start()
std::string pipeName = buffer.str();
Log::Info("Exposing ", pipeName);

// create the client part to send the registration command
_eventsHandler = std::make_unique<EtwEventsHandler>(_logger.get(), this);
_IpcServer = IpcServer::StartAsync(
_logger.get(),
pipeName,
_eventsHandler.get(),
(1 << 16) + sizeof(IpcHeader), // in buffer size = 64K + header
sizeof(SuccessResponse), // out buffer contains only the response
MaxInstances, // max number of instances (2 = the Agent + one pending)
1, // only one instance
TimeoutMS);
if (_IpcServer == nullptr)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,11 +645,6 @@ void CorProfilerCallback::DisposeInternal()
ProfilerEngineStatus::WriteIsProfilerEngineActive(false);
_isInitialized.store(false);

// From that time, we need to ensure that ALL native threads are stop and don't call back to managed world
// So, don't sleep before stopping the threads

DisposeServices();

// Don't forget to stop the CLR events session if any
auto* pInfo = _pCorProfilerInfoEvents;
if (pInfo != nullptr)
Expand All @@ -670,6 +665,8 @@ void CorProfilerCallback::DisposeInternal()
_pEtwEventsManager->Stop();
}

DisposeServices();

ICorProfilerInfo5* pCorProfilerInfo = _pCorProfilerInfo;
if (pCorProfilerInfo != nullptr)
{
Expand Down Expand Up @@ -1424,7 +1421,7 @@ HRESULT STDMETHODCALLTYPE CorProfilerCallback::ThreadCreated(ThreadID threadId)
auto success = _pEtwEventsManager->Start();
if (!success)
{
Log::Error("Failed to the contact Datadog Agent named pipe dedicated to profiling. Try to install the latest version for lock contention and GC profiling.");
Log::Error("Failed to the contact Datadog Agent named pipe dedicated to profiling. Try to install the latest version.");

_pEtwEventsManager->Stop();
_pEtwEventsManager = nullptr;
Expand Down
Loading