Skip to content

Commit

Permalink
Shrink ElementWrapper to just the pointer and other optimizations.
Browse files Browse the repository at this point in the history
Summary:
The per thread ElementWrapper array can be very sparse and for a
process where num_threads * num_thread_local_objects is large, it can produce
significant memory overheads. By moving all the auxilliary information from
ElementWrapper into the more compact ThreadEntrySet, we can eliminate 8 bytes
per wrapper (sizeof(ElementWrapper) goes from 16 bytes to 8).

As an added bonus, we can also now have accessAllThreads() use the copy of the
per thread instance ptr stashed in ThreadEntrySet instead of accessing the
elements array of a thread directly. The lock protecting the ThreadEntrySet is
already held in the accessor and we no longer need it to also hold the
StaticMeta's lock_. The meta.lock_ was otherwise held to prevent a thread's
element array from being resized when the accessor might be trying to read from
it.

Reviewed By: yfeldblum

Differential Revision: D58916891

fbshipit-source-id: fe0594a9706ded322c2ab6d4d461dba124733fbb
  • Loading branch information
Nitin Garg authored and facebook-github-bot committed Aug 16, 2024
1 parent 0daffea commit 10dfe0d
Show file tree
Hide file tree
Showing 3 changed files with 333 additions and 246 deletions.
38 changes: 12 additions & 26 deletions folly/ThreadLocal.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ class ThreadLocalPtr {
threadlocal_detail::StaticMetaBase& meta_;
SharedMutex* accessAllThreadsLock_;
SharedMutex* forkHandlerLock_;
std::mutex* lock_;
uint32_t id_;

// Prevent the entry set from changing while we are iterating over it.
Expand All @@ -267,7 +266,7 @@ class ThreadLocalPtr {
class Iterator {
friend class Accessor;
const Accessor* accessor_{nullptr};
using InnerVector = threadlocal_detail::ThreadEntrySet::EntryVector;
using InnerVector = threadlocal_detail::ThreadEntrySet::ElementVector;
using InnerIterator = InnerVector::iterator;

InnerVector& vec_;
Expand All @@ -288,12 +287,10 @@ class ThreadLocalPtr {
}

const T& dereference() const {
return *static_cast<T*>((*iter_)->elements[accessor_->id_].ptr);
return *static_cast<T*>(iter_->wrapper.ptr);
}

T& dereference() {
return *static_cast<T*>((*iter_)->elements[accessor_->id_].ptr);
}
T& dereference() { return *static_cast<T*>(iter_->wrapper.ptr); }

bool equal(const Iterator& other) const {
return (accessor_->id_ == other.accessor_->id_ && iter_ == other.iter_);
Expand All @@ -303,7 +300,7 @@ class ThreadLocalPtr {

explicit Iterator(const Accessor* accessor, bool toEnd = false)
: accessor_(accessor),
vec_(accessor_->wlockedThreadEntrySet_->threadEntries),
vec_(accessor_->wlockedThreadEntrySet_->threadElements),
iter_(vec_.begin()) {
if (toEnd) {
setToEnd();
Expand All @@ -314,9 +311,7 @@ class ThreadLocalPtr {

// we just need to check the ptr since it can be set to nullptr
// even if the entry is part of the list
bool valid() const {
return (iter_ != vec_.end() && (*iter_)->elements[accessor_->id_].ptr);
}
bool valid() const { return (iter_ != vec_.end() && iter_->wrapper.ptr); }

void incrementToValid() {
for (; iter_ != vec_.end() && !valid(); ++iter_) {
Expand Down Expand Up @@ -371,9 +366,9 @@ class ThreadLocalPtr {

bool operator!=(Iterator const& rhs) const { return !equal(rhs); }

std::thread::id getThreadId() const { return (*iter_)->tid(); }
std::thread::id getThreadId() const { return iter_->threadEntry->tid(); }

uint64_t getOSThreadId() const { return (*iter_)->tid_os; }
uint64_t getOSThreadId() const { return iter_->threadEntry->tid_os; }
};

~Accessor() { release(); }
Expand All @@ -389,12 +384,10 @@ class ThreadLocalPtr {
: meta_(other.meta_),
accessAllThreadsLock_(other.accessAllThreadsLock_),
forkHandlerLock_(other.forkHandlerLock_),
lock_(other.lock_),
id_(other.id_) {
other.id_ = 0;
other.accessAllThreadsLock_ = nullptr;
other.forkHandlerLock_ = nullptr;
other.lock_ = nullptr;
wlockedThreadEntrySet_ = std::move(other.wlockedThreadEntrySet_);
}

Expand All @@ -406,11 +399,9 @@ class ThreadLocalPtr {
// which is impossible, which leaves only one possible scenario --
// *this is empty. Assert it.
assert(&meta_ == &other.meta_);
assert(lock_ == nullptr);
using std::swap;
swap(accessAllThreadsLock_, other.accessAllThreadsLock_);
swap(forkHandlerLock_, other.forkHandlerLock_);
swap(lock_, other.lock_);
swap(id_, other.id_);
wlockedThreadEntrySet_.unlock();
swap(wlockedThreadEntrySet_, other.wlockedThreadEntrySet_);
Expand All @@ -420,33 +411,28 @@ class ThreadLocalPtr {
: meta_(threadlocal_detail::StaticMeta<Tag, AccessMode>::instance()),
accessAllThreadsLock_(nullptr),
forkHandlerLock_(nullptr),
lock_(nullptr),
id_(0) {}

private:
explicit Accessor(uint32_t id)
: meta_(threadlocal_detail::StaticMeta<Tag, AccessMode>::instance()),
accessAllThreadsLock_(&meta_.accessAllThreadsLock_),
forkHandlerLock_(&meta_.forkHandlerLock_),
lock_(&meta_.lock_) {
forkHandlerLock_(&meta_.forkHandlerLock_) {
forkHandlerLock_->lock_shared();
accessAllThreadsLock_->lock();
id_ = id;
wlockedThreadEntrySet_ = meta_.allId2ThreadEntrySets_[id_].wlock();
lock_->lock();
}

void release() {
if (lock_) {
lock_->unlock();
DCHECK(accessAllThreadsLock_ != nullptr);
if (accessAllThreadsLock_) {
id_ = 0;
accessAllThreadsLock_->unlock();
accessAllThreadsLock_ = nullptr;
DCHECK(forkHandlerLock_ != nullptr);
forkHandlerLock_->unlock_shared();
id_ = 0;
lock_ = nullptr;
accessAllThreadsLock_ = nullptr;
forkHandlerLock_ = nullptr;
wlockedThreadEntrySet_.unlock();
}
}
};
Expand Down
157 changes: 78 additions & 79 deletions folly/detail/ThreadLocalDetail.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,28 @@ void SharedPtrDeleter::operator()(
ts_.reset();
}

uintptr_t ElementWrapper::castForgetAlign(DeleterFunType* f) noexcept {
uintptr_t ElementDisposeInfo::castForgetAlign(DeleterFunType* f) noexcept {
auto const p = reinterpret_cast<char const*>(f);
auto const q = std::launder(p);
return reinterpret_cast<uintptr_t>(q);
}

bool ThreadEntrySet::basicSanity() const {
return //
threadEntries.size() == entryToVectorSlot.size() &&
threadElements.size() == entryToVectorSlot.size() &&
std::all_of(
entryToVectorSlot.begin(),
entryToVectorSlot.end(),
[&](auto const& kvp) {
return kvp.second < threadEntries.size() &&
threadEntries[kvp.second] == kvp.first;
return kvp.second < threadElements.size() &&
threadElements[kvp.second].threadEntry == kvp.first;
});
}

void ThreadEntrySet::compress() {
assert(compressible());
// compress the vector
threadEntries.shrink_to_fit();
threadElements.shrink_to_fit();
// compress the index
EntryIndex newIndex;
newIndex.reserve(entryToVectorSlot.size());
Expand Down Expand Up @@ -131,37 +131,32 @@ void StaticMetaBase::onThreadExit(void* ptr) {
// ThreadLocal B destructor.
pthread_setspecific(meta.pthreadKey_, threadEntry);

std::shared_lock forkRlock(meta.forkHandlerLock_);
std::shared_lock rlock(meta.accessAllThreadsLock_, std::defer_lock);
if (meta.strict_) {
rlock.lock();
}
meta.removeThreadEntryFromAllInMap(threadEntry);
forkRlock.unlock();
{
std::lock_guard<std::mutex> g(meta.lock_);
// mark it as removed
threadEntry->removed_ = true;
auto elementsCapacity = threadEntry->getElementsCapacity();
auto beforeCount = meta.totalElementWrappers_.fetch_sub(elementsCapacity);
DCHECK_GE(beforeCount, elementsCapacity);
// No need to hold the lock any longer; the ThreadEntry is private to this
// thread now that it's been removed from meta.
}
// NOTE: User-provided deleter / object dtor itself may be using ThreadLocal
// with the same Tag, so dispose() calls below may (re)create some of the
// elements or even increase elementsCapacity, thus multiple cleanup rounds
// may be required.
auto elementsCapacity = threadEntry->getElementsCapacity();
auto beforeCount = meta.totalElementWrappers_.fetch_sub(elementsCapacity);
DCHECK_GE(beforeCount, elementsCapacity);
for (bool shouldRun = true; shouldRun;) {
std::shared_lock forkRlock(meta.forkHandlerLock_);
std::shared_lock rlock(meta.accessAllThreadsLock_, std::defer_lock);
if (meta.strict_) {
rlock.lock();
}
// NOTE: User-provided deleter / object dtor itself may be using
// ThreadLocal with the same Tag, so dispose() calls below may (re)create
// some of the elements or even increase elementsCapacity, thus multiple
// cleanup rounds may be required.
std::vector<ElementDisposeInfo> elements;
shouldRun = false;
auto elementsCapacity = threadEntry->getElementsCapacity();
FOR_EACH_RANGE (i, 0, elementsCapacity) {
if (threadEntry->elements[i].dispose(TLPDestructionMode::THIS_THREAD)) {
threadEntry->elements[i].cleanup();
meta.removeThreadEntryAndCollectDisposeInfos(threadEntry, elements);
forkRlock.unlock();
for (auto& e : elements) {
if (e.dispose(TLPDestructionMode::THIS_THREAD)) {
e.cleanup();
shouldRun = true;
}
}
DCHECK(meta.isThreadEntryRemovedFromAllInMap(threadEntry, !meta.strict_));
DCHECK(
shouldRun ||
meta.isThreadEntryRemovedFromAllInMap(threadEntry, !meta.strict_));
}
pthread_setspecific(meta.pthreadKey_, nullptr);
}
Expand All @@ -187,17 +182,19 @@ void StaticMetaBase::cleanupThreadEntriesAndList(
while (tmp) {
auto& meta = *tmp->meta;
pthread_setspecific(meta.pthreadKey_, tmp);
std::shared_lock rlock(meta.accessAllThreadsLock_, std::defer_lock);
if (meta.strict_) {
rlock.lock();
}

for (bool shouldRunInner = true; shouldRunInner;) {
std::shared_lock forkRlock(meta.forkHandlerLock_);
std::shared_lock rlock(meta.accessAllThreadsLock_, std::defer_lock);
if (meta.strict_) {
rlock.lock();
}
shouldRunInner = false;
auto elementsCapacity = tmp->getElementsCapacity();
FOR_EACH_RANGE (i, 0, elementsCapacity) {
if (tmp->elements[i].dispose(TLPDestructionMode::THIS_THREAD)) {
tmp->elements[i].cleanup();
std::vector<ElementDisposeInfo> elements;
meta.removeThreadEntryAndCollectDisposeInfos(tmp, elements);
forkRlock.unlock();
for (auto& e : elements) {
if (e.dispose(TLPDestructionMode::THIS_THREAD)) {
e.cleanup();
shouldRunInner = true;
shouldRunOuter = true;
}
Expand Down Expand Up @@ -256,12 +253,40 @@ uint32_t StaticMetaBase::allocate(EntryID* ent) {
return id;
}

ThreadEntrySet StaticMetaBase::popThreadEntrySetAndClearElementPtrs(
uint32_t id) {
ThreadEntrySet tmp;
auto wlocked = allId2ThreadEntrySets_[id].wlock();
std::swap(*wlocked, tmp);
std::lock_guard<std::mutex> g(lock_);
for (auto& e : tmp.threadElements) {
auto elementsCapacity = e.threadEntry->getElementsCapacity();
if (id < elementsCapacity) {
/*
* Writing another thread's ThreadEntry from here is fine;
* The TL object is being destroyed, so get(id), or reset()
* or accessAllThreads calls on it are illegal. Only other
* racing accesses would be from the owner thread itself
* either a) reallocating the elements array (guarded by lock_, so safe)
* or b) exiting and trying to clear the elements array or free the
* elements and ThreadEntry itself. The ThreadEntrySet lock synchronizes
* this part as the exiting thread will acquire it to remove itself from
* the set.
*/
e.threadEntry->elements[id].ptr = nullptr;
}
// Destroy should not access thread entry after this call as racing
// exit call can make it invalid.
e.threadEntry = nullptr;
}
return tmp;
}

void StaticMetaBase::destroy(EntryID* ent) {
try {
auto& meta = *this;

// Elements in other threads that use this id.
std::vector<ElementWrapper> elements;
ThreadEntrySet tmpEntrySet;

{
Expand All @@ -283,39 +308,18 @@ void StaticMetaBase::destroy(EntryID* ent) {
if (id == kEntryIDInvalid) {
return;
}
meta.allId2ThreadEntrySets_[id].swap(tmpEntrySet);
tmpEntrySet = meta.popThreadEntrySetAndClearElementPtrs(id);
forkRlock.unlock();

{
std::lock_guard<std::mutex> g(meta.lock_);
for (auto& e : tmpEntrySet.threadEntries) {
auto elementsCapacity = e->getElementsCapacity();
if (id < elementsCapacity) {
if (e->elements[id].ptr) {
elements.push_back(e->elements[id]);
/*
* Writing another thread's ThreadEntry from here is fine;
* the only other potential reader is the owning thread --
* from onThreadExit (which grabs the lock, so is properly
* synchronized with us) or from get(), which also grabs
* the lock if it needs to resize the elements vector.
*
* We can't conflict with reads for a get(id), because
* it's illegal to call get on a thread local that's
* destructing.
*/
e->elements[id].ptr = nullptr;
e->elements[id].deleter = 0;
}
}
}
meta.freeIds_.push_back(id);
}
}
// Delete elements outside the locks.
for (ElementWrapper& elem : elements) {
if (elem.dispose(TLPDestructionMode::ALL_THREADS)) {
elem.cleanup();
for (auto& e : tmpEntrySet.threadElements) {
if (e.wrapper.dispose(TLPDestructionMode::ALL_THREADS)) {
e.wrapper.cleanup();
}
}
} catch (...) { // Just in case we get a lock error or something anyway...
Expand Down Expand Up @@ -444,19 +448,14 @@ void StaticMetaBase::reserve(EntryID* id) {
*/
void* ThreadEntry::releaseElement(uint32_t id) {
auto rlocked = meta->allId2ThreadEntrySets_[id].rlock();
return elements[id].release();
}

/*
* Cleanup the element. Caller is holding rlock on the ThreadEntrySet
* corresponding to the id. Running destructors of user objects isn't ideal
* under lock but this is the historical behavior. It should be possible to
* restructure this if a need for it arises.
*/
void ThreadEntry::cleanupElement(uint32_t id) {
elements[id].dispose(TLPDestructionMode::THIS_THREAD);
// Cleanup
elements[id].cleanup();
auto slot = rlocked->getIndexFor(this);
if (slot < 0) {
return nullptr;
}
auto ptr = rlocked.asNonConstUnsafe().threadElements[slot].wrapper.release();
DCHECK_EQ(ptr, elements[id].ptr);
elements[id].ptr = nullptr;
return ptr;
}

FOLLY_STATIC_CTOR_PRIORITY_MAX
Expand Down
Loading

0 comments on commit 10dfe0d

Please sign in to comment.