Skip to content

Commit

Permalink
[Profiler] Provide the thread id that blocked another thread (#5959)
Browse files Browse the repository at this point in the history
  • Loading branch information
gleocadie authored Aug 29, 2024
1 parent d58fdb9 commit 338dd13
Show file tree
Hide file tree
Showing 12 changed files with 138 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,18 @@ void ClrEventsParser::ParseContentionEvent(DWORD id, DWORD version, ULONG cbEven

_pContentionListener->OnContention(payload.DurationNs);
}

if ((id == EVENT_CONTENTION_START) && (version >= 2))
{
ContentionStartV2Payload payload{0};
ULONG offset = 0;
if (!Read<ContentionStartV2Payload>(payload, pEventData, cbEventData, offset))
{
return;
}

_pContentionListener->SetBlockingThread(payload.LockOwnerThreadID);
}
}

void ClrEventsParser::NotifySuspension(uint64_t timestamp, uint32_t number, uint32_t generation, uint64_t duration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ struct ContentionStopV1Payload // for .NET Core/ 5+
double_t DurationNs; // Duration of the contention (without spinning)
};

struct ContentionStartV2Payload // for .NET Core/ 5+
{
uint8_t ContentionFlags; // 0 for managed; 1 for native.
uint16_t ClrInstanceId; // Unique ID for the instance of CLR.
uintptr_t LockId;
uintptr_t AssociatedObjectID;
// This is a copy/paste from the CLR, but the LockOwnerThreadID is not a ThreadID but an OS Thread Id
uint64_t LockOwnerThreadID;
};

struct GCStartPayload
{
uint32_t Count;
Expand Down Expand Up @@ -332,6 +342,7 @@ class ClrEventsParser
private:
const int EVENT_ALLOCATION_TICK = 10; // version 4 contains the size + reference
const int EVENT_CONTENTION_STOP = 91; // version 1 contains the duration in nanoseconds
const int EVENT_CONTENTION_START = 81; // version 2 contains thread id of the threads that owns the lock

// Events emitted during garbage collection lifetime
// read https://medium.com/criteo-engineering/spying-on-net-garbage-collector-with-net-core-eventpipes-9f2a986d5705?source=friends_link&sk=baf9a7766fb5c7899b781f016803597f
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "IRuntimeIdStore.h"
#include "IThreadsCpuManager.h"
#include "IUpscaleProvider.h"
#include "ManagedThreadInfo.h"
#include "OsSpecificApi.h"
#include "Sample.h"
#include "SampleValueTypeProvider.h"
Expand Down Expand Up @@ -83,17 +84,23 @@ std::string ContentionProvider::GetBucket(double contentionDurationNs)
// .NET Framework implementation
void ContentionProvider::OnContention(uint64_t timestamp, uint32_t threadId, double contentionDurationNs, const std::vector<uintptr_t>& stack)
{
AddContentionSample(timestamp, threadId, contentionDurationNs, stack);
AddContentionSample(timestamp, threadId, contentionDurationNs, 0, stack);
}

void ContentionProvider::SetBlockingThread(uint64_t osThreadId)
{
ManagedThreadInfo::CurrentThreadInfo->SetBlockingThread(osThreadId);
}

// .NET synchronous implementation: we are expecting to be called from the same thread that is contending.
// It means that the current thread will be stack walking itself.
void ContentionProvider::OnContention(double contentionDurationNs)
{
AddContentionSample(0, -1, contentionDurationNs, _emptyStack);
auto blockingThreadId = ManagedThreadInfo::CurrentThreadInfo->SetBlockingThread(0);
AddContentionSample(0, -1, contentionDurationNs, blockingThreadId, _emptyStack);
}

void ContentionProvider::AddContentionSample(uint64_t timestamp, uint32_t threadId, double contentionDurationNs, const std::vector<uintptr_t>& stack)
void ContentionProvider::AddContentionSample(uint64_t timestamp, uint32_t threadId, double contentionDurationNs, uint64_t blockingThreadId, const std::vector<uintptr_t>& stack)
{
_lockContentionsCountMetric->Incr();
_lockContentionsDurationMetric->Add(contentionDurationNs);
Expand Down Expand Up @@ -201,6 +208,7 @@ void ContentionProvider::AddContentionSample(uint64_t timestamp, uint32_t thread

rawSample.ContentionDuration = contentionDurationNs;
rawSample.Bucket = std::move(bucket);
rawSample.BlockingThread = blockingThreadId;

Add(std::move(rawSample));
_sampledLockContentionsCountMetric->Incr();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,15 @@ class ContentionProvider :
// IContentionListener implementation
void OnContention(double contentionDurationNs) override;
void OnContention(uint64_t timestamp, uint32_t threadId, double contentionDurationNs, const std::vector<uintptr_t>& stack) override;
void SetBlockingThread(uint64_t osThreadId) override;

// IUpscaleProvider implementation
UpscalingInfo GetInfo() override;

private:
static std::string GetBucket(double contentionDurationNs);
static std::vector<SampleValueType> SampleTypeDefinitions;
void AddContentionSample(uint64_t timestamp, uint32_t threadId, double contentionDurationNs, const std::vector<uintptr_t>& stack);
void AddContentionSample(uint64_t timestamp, uint32_t threadId, double contentionDurationNs, uint64_t blockingThreadId, const std::vector<uintptr_t>& stack);

private:
static std::vector<uintptr_t> _emptyStack;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1541,12 +1541,13 @@ HRESULT STDMETHODCALLTYPE CorProfilerCallback::ThreadAssignedToOSThread(ThreadID
dupOsThreadHandle = origOsThreadHandle;
#endif

#ifdef LINUX
auto threadInfo = _pManagedThreadList->GetOrCreate(managedThreadId);
// CurrentThreadInfo relies on the assumption that the native thread calling ThreadAssignedToOSThread/ThreadDestroyed
// is the same native thread assigned to the managed thread.
ManagedThreadInfo::CurrentThreadInfo = threadInfo;

#ifdef LINUX

if (_pCpuProfiler != nullptr)
{
_pCpuProfiler->RegisterThread(threadInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ class IContentionListener

virtual void OnContention(double contentionDurationNs) = 0;
virtual void OnContention(uint64_t timestamp, uint32_t threadId, double contentionDurationNs, const std::vector<uintptr_t>& stack) = 0;
virtual void SetBlockingThread(uint64_t osThreadId) = 0;
};
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ ManagedThreadInfo::ManagedThreadInfo(ThreadID clrThreadId, ICorProfilerInfo4* pC
_isThreadDestroyed{false},
_traceContextTrackingInfo{},
_sharedMemoryArea{nullptr},
_info{pCorProfilerInfo}
_info{pCorProfilerInfo},
_blockingThreadId{0}
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ struct ManagedThreadInfo : public IThreadInfo
inline bool IsThreadDestroyed();
inline bool IsDestroyed();
inline void SetThreadDestroyed();
inline uint64_t SetBlockingThread(uint64_t osThreadId);

inline TraceContextTrackingInfo* GetTraceContextPointer();
inline std::uint64_t GetLocalRootSpanId() const;
Expand Down Expand Up @@ -151,6 +152,7 @@ struct ManagedThreadInfo : public IThreadInfo
#ifdef LINUX
std::int32_t _timerId;
#endif
uint64_t _blockingThreadId;
};

std::string ManagedThreadInfo::GetProfileThreadId()
Expand Down Expand Up @@ -404,6 +406,11 @@ inline void ManagedThreadInfo::SetThreadDestroyed()
_isThreadDestroyed = true;
}

inline uint64_t ManagedThreadInfo::SetBlockingThread(uint64_t osThreadId)
{
return std::exchange(_blockingThreadId, osThreadId);
}

inline TraceContextTrackingInfo* ManagedThreadInfo::GetTraceContextPointer()
{
return &_traceContextTrackingInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class RawContentionSample : public RawSample
inline static const std::string BucketLabelName = "Duration bucket";
inline static const std::string RawCountLabelName = "raw count";
inline static const std::string RawDurationLabelName = "raw duration";
inline static const std::string BlockingThreadInfo = "blocking thread";

public:
RawContentionSample() = default;
Expand All @@ -20,7 +21,8 @@ class RawContentionSample : public RawSample
:
RawSample(std::move(other)),
ContentionDuration(other.ContentionDuration),
Bucket(std::move(other.Bucket))
Bucket(std::move(other.Bucket)),
BlockingThread(other.BlockingThread)
{
}

Expand All @@ -31,6 +33,7 @@ class RawContentionSample : public RawSample
RawSample::operator=(std::move(other));
ContentionDuration = other.ContentionDuration;
Bucket = std::move(other.Bucket);
BlockingThread = other.BlockingThread;
}
return *this;
}
Expand All @@ -46,8 +49,13 @@ class RawContentionSample : public RawSample
sample->AddNumericLabel(NumericLabel{RawCountLabelName, 1});
sample->AddNumericLabel(NumericLabel{RawDurationLabelName, static_cast<uint64_t>(ContentionDuration)});
sample->AddValue(static_cast<std::int64_t>(ContentionDuration), contentionDurationIndex);
if (BlockingThread != 0)
{
sample->AddNumericLabel(NumericLabel{BlockingThreadInfo, BlockingThread});
}
}

double ContentionDuration;
std::string Bucket;
uint64_t BlockingThread;
};
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2022 Datadog, Inc.
// </copyright>

using System;
using System.Linq;
using Datadog.Profiler.IntegrationTests.Helpers;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

Expand Down Expand Up @@ -34,6 +37,11 @@ public void ShouldGetContentionSamples(string appName, string framework, string
// only contention profiler enabled so should only see the 2 related values per sample
SamplesHelper.CheckSamplesValueCount(runner.Environment.PprofDir, 2);
Assert.True(SamplesHelper.IsLabelPresent(runner.Environment.PprofDir, "raw duration"));

if (framework == "net8.0")
{
AssertBlockingThreadLabel(runner.Environment.PprofDir);
}
}

[TestAppFact("Samples.Computer01", new[] { "net6.0", "net7.0", "net8.0" })]
Expand All @@ -54,6 +62,11 @@ public void ShouldContentionProfilerBeEnabledByDefault(string appName, string fr
// only contention profiler enabled so should see 2 value per sample
SamplesHelper.CheckSamplesValueCount(runner.Environment.PprofDir, 2);
Assert.NotEqual(0, SamplesHelper.GetSamplesCount(runner.Environment.PprofDir));

if (framework == "net8.0")
{
AssertBlockingThreadLabel(runner.Environment.PprofDir);
}
}

[TestAppFact("Samples.Computer01", new[] { "net6.0", "net7.0", "net8.0" })]
Expand All @@ -71,5 +84,22 @@ public void ExplicitlyDisableContentionProfiler(string appName, string framework
// only walltime profiler enabled so should see 1 value per sample
SamplesHelper.CheckSamplesValueCount(runner.Environment.PprofDir, 1);
}

private static void AssertBlockingThreadLabel(string pprofDir)
{
var threadIds = SamplesHelper.GetThreadIds(pprofDir);
// get samples with lock-count value set and blocking thread info
var contentionSamples = SamplesHelper.GetSamples(pprofDir, "lock-count")
.Where(e => e.Labels.Any(x => x.Name == "blocking thread"));

contentionSamples.Should().NotBeEmpty();

foreach (var (_, labels, _) in contentionSamples)
{
var label = labels.FirstOrDefault(l => l.Name == "blocking thread");
label.Name.Should().NotBeNullOrWhiteSpace();
threadIds.Should().Contain(int.Parse(label.Value), $"Unknown blocking thread id {label.Value}");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ public static IEnumerable<Label> Labels(this Perftools.Profiles.Sample sample, P
});
}

public static string[] SampleType(this Perftools.Profiles.Profile profile)
{
return profile.SampleType.Select(
sampleType =>
{
return profile.StringTable[(int)sampleType.Type];
}).ToArray();
}

public static StackTrace StackTrace(this Perftools.Profiles.Sample sample, Profile profile)
{
return new StackTrace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text.RegularExpressions;
using K4os.Compression.LZ4.Streams;
using Perftools.Profiles;
using Xunit;
Expand Down Expand Up @@ -84,6 +85,29 @@ public static int GetThreadCount(string directory)
return threadNames.Count;
}

public static HashSet<int> GetThreadIds(string directory)
{
HashSet<int> ids = new();
var regex = new Regex(@"<[0-9]+> \[#(?<OsId>[0-9]+)\]", RegexOptions.Compiled);
foreach (var profile in GetProfiles(directory))
{
foreach (var sample in profile.Sample)
{
foreach (var label in sample.Labels(profile))
{
if (label.Name == "thread id")
{
var match = regex.Match(label.Value);
ids.Add(int.Parse(match.Groups["OsId"].Value));
continue;
}
}
}
}

return ids;
}

public static bool IsLabelPresent(string directory, string labelName)
{
foreach (var profile in GetProfiles(directory))
Expand All @@ -109,13 +133,24 @@ public static bool IsLabelPresent(string directory, string labelName)
return true;
}

internal static IEnumerable<(StackTrace StackTrace, Label[] Labels, long[] Values)> GetSamples(string directory)
internal static IEnumerable<(StackTrace StackTrace, PprofHelper.Label[] Labels, long[] Values)> GetSamples(string directory, string sampleTypeFilter = null)
{
foreach (var profile in GetProfiles(directory))
{
var sampleTypeIdx = -1;
if (sampleTypeFilter != null)
{
var sampleTypes = profile.SampleType();
sampleTypeIdx = Array.IndexOf(sampleTypes, sampleTypeFilter);
}

foreach (var sample in profile.Sample)
{
yield return (sample.StackTrace(profile), sample.Label.ToArray(), sample.Value.ToArray());
var values = sample.Value.ToArray();
if (sampleTypeFilter == null || (sampleTypeIdx != -1 && values[sampleTypeIdx] != 0))
{
yield return (sample.StackTrace(profile), GetLabels(profile, sample).ToArray(), values);
}
}
}
}
Expand Down Expand Up @@ -150,6 +185,11 @@ public static bool IsLabelPresent(string directory, string labelName)
.Select(s => (s.Type, s.Message, s.Count, s.Stacktrace));
}

private static IEnumerable<PprofHelper.Label> GetLabels(Profile profile, Sample sample)
{
return sample.Labels(profile);
}

private static bool HaveSamplesValueCount(string directory, int valuesCount)
{
foreach (var profile in GetProfiles(directory))
Expand Down

0 comments on commit 338dd13

Please sign in to comment.