From 98114a2ccdae0824613b7d304d6adb22ead5c9fb Mon Sep 17 00:00:00 2001 From: Ashish Banerjee Date: Mon, 12 Aug 2024 18:01:33 +0000 Subject: [PATCH] Backport https://github.com/envoyproxy/envoy/pull/33395 --- source/common/runtime/runtime_features.cc | 1 + source/common/thread_local/BUILD | 1 + .../common/thread_local/thread_local_impl.cc | 41 +++++++ .../common/thread_local/thread_local_impl.h | 5 +- source/server/server.cc | 10 +- test/common/thread_local/BUILD | 1 + .../thread_local/thread_local_impl_test.cc | 100 +++++++++++++++++- 7 files changed, 150 insertions(+), 9 deletions(-) diff --git a/source/common/runtime/runtime_features.cc b/source/common/runtime/runtime_features.cc index dafe75ac2901..e1e534166c75 100644 --- a/source/common/runtime/runtime_features.cc +++ b/source/common/runtime/runtime_features.cc @@ -92,6 +92,7 @@ RUNTIME_GUARD(envoy_reloadable_features_validate_connect); RUNTIME_GUARD(envoy_reloadable_features_validate_detailed_override_host_statuses); RUNTIME_GUARD(envoy_reloadable_features_validate_grpc_header_before_log_grpc_status); RUNTIME_GUARD(envoy_reloadable_features_validate_upstream_headers); +RUNTIME_GUARD(envoy_restart_features_allow_slot_destroy_on_worker_threads); RUNTIME_GUARD(envoy_restart_features_explicit_wildcard_resource); RUNTIME_GUARD(envoy_restart_features_remove_runtime_singleton); RUNTIME_GUARD(envoy_restart_features_send_goaway_for_premature_rst_streams); diff --git a/source/common/thread_local/BUILD b/source/common/thread_local/BUILD index 3e1697e39785..726cd13c29a6 100644 --- a/source/common/thread_local/BUILD +++ b/source/common/thread_local/BUILD @@ -18,5 +18,6 @@ envoy_cc_library( "//source/common/common:assert_lib", "//source/common/common:minimal_logger_lib", "//source/common/common:stl_helpers", + "//source/common/runtime:runtime_features_lib", ], ) diff --git a/source/common/thread_local/thread_local_impl.cc b/source/common/thread_local/thread_local_impl.cc index 88981133cff0..bd417164b103 100644 --- a/source/common/thread_local/thread_local_impl.cc +++ b/source/common/thread_local/thread_local_impl.cc @@ -9,12 +9,18 @@ #include "source/common/common/assert.h" #include "source/common/common/stl_helpers.h" +#include "source/common/runtime/runtime_features.h" namespace Envoy { namespace ThreadLocal { thread_local InstanceImpl::ThreadLocalData InstanceImpl::thread_local_data_; +InstanceImpl::InstanceImpl() { + allow_slot_destroy_on_worker_threads_ = + Runtime::runtimeFeatureEnabled("envoy.restart_features.allow_slot_destroy_on_worker_threads"); +} + InstanceImpl::~InstanceImpl() { ASSERT_IS_MAIN_OR_TEST_THREAD(); ASSERT(shutdown_); @@ -41,6 +47,41 @@ SlotPtr InstanceImpl::allocateSlot() { InstanceImpl::SlotImpl::SlotImpl(InstanceImpl& parent, uint32_t index) : parent_(parent), index_(index), still_alive_guard_(std::make_shared(true)) {} +InstanceImpl::SlotImpl::~SlotImpl() { + // If the runtime feature is disabled then keep the original behavior. This should + // be cleaned up when the runtime feature + // "envoy.restart_features.allow_slot_destroy_on_worker_threads" is deprecated. + if (!parent_.allow_slot_destroy_on_worker_threads_) { + parent_.removeSlot(index_); + return; + } + + // Do nothing if the parent is already shutdown. Return early here to avoid accessing the main + // thread dispatcher because it may have been destroyed. + if (isShutdown()) { + return; + } + + auto* main_thread_dispatcher = parent_.main_thread_dispatcher_; + // Main thread dispatcher may be nullptr if the slot is being created and destroyed during + // server initialization. + if (!parent_.allow_slot_destroy_on_worker_threads_ || main_thread_dispatcher == nullptr || + main_thread_dispatcher->isThreadSafe()) { + // If the slot is being destroyed on the main thread, we can remove it immediately. + parent_.removeSlot(index_); + } else { + // If the slot is being destroyed on a worker thread, we need to post the removal to the + // main thread. There are two possible cases here: + // 1. The removal is executed on the main thread as expected if the main dispatcher is still + // active. This is the common case and the clean up will be done as expected because the + // the worker dispatchers must be active before the main dispatcher is exited. + // 2. The removal is not executed if the main dispatcher has already exited. This is fine + // because the removal has no side effect and will be ignored. The shutdown process will + // clean up all the slots anyway. + main_thread_dispatcher->post([i = index_, &tls = parent_] { tls.removeSlot(i); }); + } +} + std::function InstanceImpl::SlotImpl::wrapCallback(const std::function& cb) { // See the header file comments for still_alive_guard_ for the purpose of this capture and the // expired check below. diff --git a/source/common/thread_local/thread_local_impl.h b/source/common/thread_local/thread_local_impl.h index 25eebe4621fd..90753101b619 100644 --- a/source/common/thread_local/thread_local_impl.h +++ b/source/common/thread_local/thread_local_impl.h @@ -19,6 +19,7 @@ namespace ThreadLocal { */ class InstanceImpl : Logger::Loggable, public NonCopyable, public Instance { public: + InstanceImpl(); ~InstanceImpl() override; // ThreadLocal::Instance @@ -35,7 +36,7 @@ class InstanceImpl : Logger::Loggable, public NonCopyable, pub // slot as callbacks drain from workers. struct SlotImpl : public Slot { SlotImpl(InstanceImpl& parent, uint32_t index); - ~SlotImpl() override { parent_.removeSlot(index_); } + ~SlotImpl() override; std::function wrapCallback(const std::function& cb); std::function dataCallback(const UpdateCb& cb); static bool currentThreadRegisteredWorker(uint32_t index); @@ -86,6 +87,8 @@ class InstanceImpl : Logger::Loggable, public NonCopyable, pub Event::Dispatcher* main_thread_dispatcher_{}; std::atomic shutdown_{}; + bool allow_slot_destroy_on_worker_threads_{}; + // Test only. friend class ThreadLocalInstanceImplTest; }; diff --git a/source/server/server.cc b/source/server/server.cc index b8c2ca1ade67..48fab3f56489 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -417,6 +417,12 @@ void InstanceImpl::initialize(Network::Address::InstanceConstSharedPtr local_add ENVOY_LOG(info, " {}: {}", ext.first, absl::StrJoin(ext.second->registeredNames(), ", ")); } + // The main thread is also registered for thread local updates so that code that does not care + // whether it runs on the main thread or on workers can still use TLS. + // We do this as early as possible because this has no side effect and could ensure that the + // TLS always contains a valid main thread dispatcher when TLS is used. + thread_local_.registerThread(*dispatcher_, true); + // Handle configuration that needs to take place prior to the main configuration load. InstanceUtil::loadBootstrapConfig(bootstrap_, options_, messageValidationContext().staticValidationVisitor(), *api_); @@ -647,10 +653,6 @@ void InstanceImpl::initialize(Network::Address::InstanceConstSharedPtr local_add listener_manager_ = listener_manager_factory->createListenerManager( *this, nullptr, worker_factory_, bootstrap_.enable_dispatcher_stats(), quic_stat_names_); - // The main thread is also registered for thread local updates so that code that does not care - // whether it runs on the main thread or on workers can still use TLS. - thread_local_.registerThread(*dispatcher_, true); - // We can now initialize stats for threading. stats_store_.initializeThreading(*dispatcher_, thread_local_); diff --git a/test/common/thread_local/BUILD b/test/common/thread_local/BUILD index fdd28bc076fb..725568bc51c4 100644 --- a/test/common/thread_local/BUILD +++ b/test/common/thread_local/BUILD @@ -17,5 +17,6 @@ envoy_cc_test( "//source/common/stats:isolated_store_lib", "//source/common/thread_local:thread_local_lib", "//test/mocks/event:event_mocks", + "//test/test_common:test_runtime_lib", ], ) diff --git a/test/common/thread_local/thread_local_impl_test.cc b/test/common/thread_local/thread_local_impl_test.cc index 9b5d100160d4..4ae9409fa452 100644 --- a/test/common/thread_local/thread_local_impl_test.cc +++ b/test/common/thread_local/thread_local_impl_test.cc @@ -4,12 +4,15 @@ #include "source/common/thread_local/thread_local_impl.h" #include "test/mocks/event/mocks.h" +#include "test/test_common/test_runtime.h" #include "gmock/gmock.h" using testing::_; using testing::InSequence; +using testing::NiceMock; using testing::Ref; +using testing::Return; using testing::ReturnPointee; namespace Envoy { @@ -52,10 +55,14 @@ class TestThreadLocalObject : public ThreadLocalObject { class ThreadLocalInstanceImplTest : public testing::Test { public: ThreadLocalInstanceImplTest() { - tls_.registerThread(main_dispatcher_, true); - EXPECT_EQ(&main_dispatcher_, &tls_.dispatcher()); + EXPECT_CALL(main_dispatcher_, isThreadSafe()).WillRepeatedly(Return(true)); + EXPECT_CALL(thread_dispatcher_, post(_)); tls_.registerThread(thread_dispatcher_, false); + // Register the main thread after the worker thread to ensure that the + // thread_local_data_.dispatcher_ of current test thread is set to the main thread dispatcher. + tls_.registerThread(main_dispatcher_, true); + EXPECT_EQ(&main_dispatcher_, &tls_.dispatcher()); } MOCK_METHOD(ThreadLocalObjectSharedPtr, createThreadLocal, (Event::Dispatcher & dispatcher)); @@ -75,8 +82,8 @@ class ThreadLocalInstanceImplTest : public testing::Test { int freeSlotIndexesListSize() { return tls_.free_slot_indexes_.size(); } InstanceImpl tls_; - Event::MockDispatcher main_dispatcher_{"test_main_thread"}; - Event::MockDispatcher thread_dispatcher_{"test_worker_thread"}; + NiceMock main_dispatcher_{"test_main_thread"}; + NiceMock thread_dispatcher_{"test_worker_thread"}; }; TEST_F(ThreadLocalInstanceImplTest, All) { @@ -343,5 +350,90 @@ TEST(ThreadLocalInstanceImplDispatcherTest, Dispatcher) { tls.shutdownThread(); } +TEST(ThreadLocalInstanceImplDispatcherTest, DestroySlotOnWorker) { + InstanceImpl tls; + + Api::ApiPtr api = Api::createApiForTest(); + Event::MockDispatcher main_dispatcher{"test_main_thread"}; + Event::DispatcherPtr thread_dispatcher(api->allocateDispatcher("test_worker_thread")); + + tls.registerThread(main_dispatcher, true); + tls.registerThread(*thread_dispatcher, false); + + // Verify we have the expected dispatcher for the main thread. + EXPECT_EQ(&main_dispatcher, &tls.dispatcher()); + + auto slot = TypedSlot<>::makeUnique(tls); + + Thread::ThreadPtr thread = Thread::threadFactoryForTest().createThread( + [&main_dispatcher, &thread_dispatcher, &tls, &slot]() { + // Ensure that the dispatcher update in tls posted during the above registerThread happens. + thread_dispatcher->run(Event::Dispatcher::RunType::NonBlock); + // Verify we have the expected dispatcher for the new thread thread. + EXPECT_EQ(thread_dispatcher.get(), &tls.dispatcher()); + + // Skip the asserts in the thread. Because the mock dispatcher will call + // callbacks directly in current thread and make the ASSERT_IS_MAIN_OR_TEST_THREAD fail. + Thread::SkipAsserts skip; + + EXPECT_CALL(main_dispatcher, isThreadSafe()).WillOnce(Return(false)); + // Destroy the slot on worker thread and expect the post() of main dispatcher to be called. + EXPECT_CALL(main_dispatcher, post(_)); + + slot.reset(); + + thread_dispatcher->run(Event::Dispatcher::RunType::NonBlock); + }); + thread->join(); + + // Verify we still have the expected dispatcher for the main thread. + EXPECT_EQ(&main_dispatcher, &tls.dispatcher()); + + tls.shutdownGlobalThreading(); + tls.shutdownThread(); +} + +TEST(ThreadLocalInstanceImplDispatcherTest, DestroySlotOnWorkerButDisableRuntimeFeature) { + TestScopedRuntime runtime; + runtime.mergeValues({{"envoy.restart_features.allow_slot_destroy_on_worker_threads", "false"}}); + + InstanceImpl tls; + + Api::ApiPtr api = Api::createApiForTest(); + Event::MockDispatcher main_dispatcher{"test_main_thread"}; + Event::DispatcherPtr thread_dispatcher(api->allocateDispatcher("test_worker_thread")); + + tls.registerThread(main_dispatcher, true); + tls.registerThread(*thread_dispatcher, false); + + // Verify we have the expected dispatcher for the main thread. + EXPECT_EQ(&main_dispatcher, &tls.dispatcher()); + + auto slot = TypedSlot<>::makeUnique(tls); + + Thread::ThreadPtr thread = Thread::threadFactoryForTest().createThread( + [&main_dispatcher, &thread_dispatcher, &tls, &slot]() { + // Ensure that the dispatcher update in tls posted during the above registerThread happens. + thread_dispatcher->run(Event::Dispatcher::RunType::NonBlock); + // Verify we have the expected dispatcher for the new thread thread. + EXPECT_EQ(thread_dispatcher.get(), &tls.dispatcher()); + + // Skip the asserts in the thread. + Thread::SkipAsserts skip; + // Destroy the slot on worker thread will not call post() of main dispatcher. + EXPECT_CALL(main_dispatcher, post(_)).Times(0); + slot.reset(); + + thread_dispatcher->run(Event::Dispatcher::RunType::NonBlock); + }); + thread->join(); + + // Verify we still have the expected dispatcher for the main thread. + EXPECT_EQ(&main_dispatcher, &tls.dispatcher()); + + tls.shutdownGlobalThreading(); + tls.shutdownThread(); +} + } // namespace ThreadLocal } // namespace Envoy