Skip to content

Commit

Permalink
Backport envoyproxy#33395
Browse files Browse the repository at this point in the history
  • Loading branch information
ashishb-solo committed Aug 12, 2024
1 parent a7fbe6d commit 98114a2
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 9 deletions.
1 change: 1 addition & 0 deletions source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ RUNTIME_GUARD(envoy_reloadable_features_validate_connect);
RUNTIME_GUARD(envoy_reloadable_features_validate_detailed_override_host_statuses);
RUNTIME_GUARD(envoy_reloadable_features_validate_grpc_header_before_log_grpc_status);
RUNTIME_GUARD(envoy_reloadable_features_validate_upstream_headers);
RUNTIME_GUARD(envoy_restart_features_allow_slot_destroy_on_worker_threads);
RUNTIME_GUARD(envoy_restart_features_explicit_wildcard_resource);
RUNTIME_GUARD(envoy_restart_features_remove_runtime_singleton);
RUNTIME_GUARD(envoy_restart_features_send_goaway_for_premature_rst_streams);
Expand Down
1 change: 1 addition & 0 deletions source/common/thread_local/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ envoy_cc_library(
"//source/common/common:assert_lib",
"//source/common/common:minimal_logger_lib",
"//source/common/common:stl_helpers",
"//source/common/runtime:runtime_features_lib",
],
)
41 changes: 41 additions & 0 deletions source/common/thread_local/thread_local_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,18 @@

#include "source/common/common/assert.h"
#include "source/common/common/stl_helpers.h"
#include "source/common/runtime/runtime_features.h"

namespace Envoy {
namespace ThreadLocal {

thread_local InstanceImpl::ThreadLocalData InstanceImpl::thread_local_data_;

InstanceImpl::InstanceImpl() {
allow_slot_destroy_on_worker_threads_ =
Runtime::runtimeFeatureEnabled("envoy.restart_features.allow_slot_destroy_on_worker_threads");
}

InstanceImpl::~InstanceImpl() {
ASSERT_IS_MAIN_OR_TEST_THREAD();
ASSERT(shutdown_);
Expand All @@ -41,6 +47,41 @@ SlotPtr InstanceImpl::allocateSlot() {
InstanceImpl::SlotImpl::SlotImpl(InstanceImpl& parent, uint32_t index)
: parent_(parent), index_(index), still_alive_guard_(std::make_shared<bool>(true)) {}

InstanceImpl::SlotImpl::~SlotImpl() {
// If the runtime feature is disabled then keep the original behavior. This should
// be cleaned up when the runtime feature
// "envoy.restart_features.allow_slot_destroy_on_worker_threads" is deprecated.
if (!parent_.allow_slot_destroy_on_worker_threads_) {
parent_.removeSlot(index_);
return;
}

// Do nothing if the parent is already shutdown. Return early here to avoid accessing the main
// thread dispatcher because it may have been destroyed.
if (isShutdown()) {
return;
}

auto* main_thread_dispatcher = parent_.main_thread_dispatcher_;
// Main thread dispatcher may be nullptr if the slot is being created and destroyed during
// server initialization.
if (!parent_.allow_slot_destroy_on_worker_threads_ || main_thread_dispatcher == nullptr ||
main_thread_dispatcher->isThreadSafe()) {
// If the slot is being destroyed on the main thread, we can remove it immediately.
parent_.removeSlot(index_);
} else {
// If the slot is being destroyed on a worker thread, we need to post the removal to the
// main thread. There are two possible cases here:
// 1. The removal is executed on the main thread as expected if the main dispatcher is still
// active. This is the common case and the clean up will be done as expected because the
// the worker dispatchers must be active before the main dispatcher is exited.
// 2. The removal is not executed if the main dispatcher has already exited. This is fine
// because the removal has no side effect and will be ignored. The shutdown process will
// clean up all the slots anyway.
main_thread_dispatcher->post([i = index_, &tls = parent_] { tls.removeSlot(i); });
}
}

std::function<void()> InstanceImpl::SlotImpl::wrapCallback(const std::function<void()>& cb) {
// See the header file comments for still_alive_guard_ for the purpose of this capture and the
// expired check below.
Expand Down
5 changes: 4 additions & 1 deletion source/common/thread_local/thread_local_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace ThreadLocal {
*/
class InstanceImpl : Logger::Loggable<Logger::Id::main>, public NonCopyable, public Instance {
public:
InstanceImpl();
~InstanceImpl() override;

// ThreadLocal::Instance
Expand All @@ -35,7 +36,7 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public NonCopyable, pub
// slot as callbacks drain from workers.
struct SlotImpl : public Slot {
SlotImpl(InstanceImpl& parent, uint32_t index);
~SlotImpl() override { parent_.removeSlot(index_); }
~SlotImpl() override;
std::function<void()> wrapCallback(const std::function<void()>& cb);
std::function<void()> dataCallback(const UpdateCb& cb);
static bool currentThreadRegisteredWorker(uint32_t index);
Expand Down Expand Up @@ -86,6 +87,8 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public NonCopyable, pub
Event::Dispatcher* main_thread_dispatcher_{};
std::atomic<bool> shutdown_{};

bool allow_slot_destroy_on_worker_threads_{};

// Test only.
friend class ThreadLocalInstanceImplTest;
};
Expand Down
10 changes: 6 additions & 4 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,12 @@ void InstanceImpl::initialize(Network::Address::InstanceConstSharedPtr local_add
ENVOY_LOG(info, " {}: {}", ext.first, absl::StrJoin(ext.second->registeredNames(), ", "));
}

// The main thread is also registered for thread local updates so that code that does not care
// whether it runs on the main thread or on workers can still use TLS.
// We do this as early as possible because this has no side effect and could ensure that the
// TLS always contains a valid main thread dispatcher when TLS is used.
thread_local_.registerThread(*dispatcher_, true);

// Handle configuration that needs to take place prior to the main configuration load.
InstanceUtil::loadBootstrapConfig(bootstrap_, options_,
messageValidationContext().staticValidationVisitor(), *api_);
Expand Down Expand Up @@ -647,10 +653,6 @@ void InstanceImpl::initialize(Network::Address::InstanceConstSharedPtr local_add
listener_manager_ = listener_manager_factory->createListenerManager(
*this, nullptr, worker_factory_, bootstrap_.enable_dispatcher_stats(), quic_stat_names_);

// The main thread is also registered for thread local updates so that code that does not care
// whether it runs on the main thread or on workers can still use TLS.
thread_local_.registerThread(*dispatcher_, true);

// We can now initialize stats for threading.
stats_store_.initializeThreading(*dispatcher_, thread_local_);

Expand Down
1 change: 1 addition & 0 deletions test/common/thread_local/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ envoy_cc_test(
"//source/common/stats:isolated_store_lib",
"//source/common/thread_local:thread_local_lib",
"//test/mocks/event:event_mocks",
"//test/test_common:test_runtime_lib",
],
)
100 changes: 96 additions & 4 deletions test/common/thread_local/thread_local_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
#include "source/common/thread_local/thread_local_impl.h"

#include "test/mocks/event/mocks.h"
#include "test/test_common/test_runtime.h"

#include "gmock/gmock.h"

using testing::_;
using testing::InSequence;
using testing::NiceMock;
using testing::Ref;
using testing::Return;
using testing::ReturnPointee;

namespace Envoy {
Expand Down Expand Up @@ -52,10 +55,14 @@ class TestThreadLocalObject : public ThreadLocalObject {
class ThreadLocalInstanceImplTest : public testing::Test {
public:
ThreadLocalInstanceImplTest() {
tls_.registerThread(main_dispatcher_, true);
EXPECT_EQ(&main_dispatcher_, &tls_.dispatcher());
EXPECT_CALL(main_dispatcher_, isThreadSafe()).WillRepeatedly(Return(true));

EXPECT_CALL(thread_dispatcher_, post(_));
tls_.registerThread(thread_dispatcher_, false);
// Register the main thread after the worker thread to ensure that the
// thread_local_data_.dispatcher_ of current test thread is set to the main thread dispatcher.
tls_.registerThread(main_dispatcher_, true);
EXPECT_EQ(&main_dispatcher_, &tls_.dispatcher());
}

MOCK_METHOD(ThreadLocalObjectSharedPtr, createThreadLocal, (Event::Dispatcher & dispatcher));
Expand All @@ -75,8 +82,8 @@ class ThreadLocalInstanceImplTest : public testing::Test {
int freeSlotIndexesListSize() { return tls_.free_slot_indexes_.size(); }
InstanceImpl tls_;

Event::MockDispatcher main_dispatcher_{"test_main_thread"};
Event::MockDispatcher thread_dispatcher_{"test_worker_thread"};
NiceMock<Event::MockDispatcher> main_dispatcher_{"test_main_thread"};
NiceMock<Event::MockDispatcher> thread_dispatcher_{"test_worker_thread"};
};

TEST_F(ThreadLocalInstanceImplTest, All) {
Expand Down Expand Up @@ -343,5 +350,90 @@ TEST(ThreadLocalInstanceImplDispatcherTest, Dispatcher) {
tls.shutdownThread();
}

TEST(ThreadLocalInstanceImplDispatcherTest, DestroySlotOnWorker) {
InstanceImpl tls;

Api::ApiPtr api = Api::createApiForTest();
Event::MockDispatcher main_dispatcher{"test_main_thread"};
Event::DispatcherPtr thread_dispatcher(api->allocateDispatcher("test_worker_thread"));

tls.registerThread(main_dispatcher, true);
tls.registerThread(*thread_dispatcher, false);

// Verify we have the expected dispatcher for the main thread.
EXPECT_EQ(&main_dispatcher, &tls.dispatcher());

auto slot = TypedSlot<>::makeUnique(tls);

Thread::ThreadPtr thread = Thread::threadFactoryForTest().createThread(
[&main_dispatcher, &thread_dispatcher, &tls, &slot]() {
// Ensure that the dispatcher update in tls posted during the above registerThread happens.
thread_dispatcher->run(Event::Dispatcher::RunType::NonBlock);
// Verify we have the expected dispatcher for the new thread thread.
EXPECT_EQ(thread_dispatcher.get(), &tls.dispatcher());

// Skip the asserts in the thread. Because the mock dispatcher will call
// callbacks directly in current thread and make the ASSERT_IS_MAIN_OR_TEST_THREAD fail.
Thread::SkipAsserts skip;

EXPECT_CALL(main_dispatcher, isThreadSafe()).WillOnce(Return(false));
// Destroy the slot on worker thread and expect the post() of main dispatcher to be called.
EXPECT_CALL(main_dispatcher, post(_));

slot.reset();

thread_dispatcher->run(Event::Dispatcher::RunType::NonBlock);
});
thread->join();

// Verify we still have the expected dispatcher for the main thread.
EXPECT_EQ(&main_dispatcher, &tls.dispatcher());

tls.shutdownGlobalThreading();
tls.shutdownThread();
}

TEST(ThreadLocalInstanceImplDispatcherTest, DestroySlotOnWorkerButDisableRuntimeFeature) {
TestScopedRuntime runtime;
runtime.mergeValues({{"envoy.restart_features.allow_slot_destroy_on_worker_threads", "false"}});

InstanceImpl tls;

Api::ApiPtr api = Api::createApiForTest();
Event::MockDispatcher main_dispatcher{"test_main_thread"};
Event::DispatcherPtr thread_dispatcher(api->allocateDispatcher("test_worker_thread"));

tls.registerThread(main_dispatcher, true);
tls.registerThread(*thread_dispatcher, false);

// Verify we have the expected dispatcher for the main thread.
EXPECT_EQ(&main_dispatcher, &tls.dispatcher());

auto slot = TypedSlot<>::makeUnique(tls);

Thread::ThreadPtr thread = Thread::threadFactoryForTest().createThread(
[&main_dispatcher, &thread_dispatcher, &tls, &slot]() {
// Ensure that the dispatcher update in tls posted during the above registerThread happens.
thread_dispatcher->run(Event::Dispatcher::RunType::NonBlock);
// Verify we have the expected dispatcher for the new thread thread.
EXPECT_EQ(thread_dispatcher.get(), &tls.dispatcher());

// Skip the asserts in the thread.
Thread::SkipAsserts skip;
// Destroy the slot on worker thread will not call post() of main dispatcher.
EXPECT_CALL(main_dispatcher, post(_)).Times(0);
slot.reset();

thread_dispatcher->run(Event::Dispatcher::RunType::NonBlock);
});
thread->join();

// Verify we still have the expected dispatcher for the main thread.
EXPECT_EQ(&main_dispatcher, &tls.dispatcher());

tls.shutdownGlobalThreading();
tls.shutdownThread();
}

} // namespace ThreadLocal
} // namespace Envoy

0 comments on commit 98114a2

Please sign in to comment.