diff --git a/docs/root/configuration/listeners/stats.rst b/docs/root/configuration/listeners/stats.rst index 09ad6858fbe2..eb82810f6972 100644 --- a/docs/root/configuration/listeners/stats.rst +++ b/docs/root/configuration/listeners/stats.rst @@ -17,6 +17,7 @@ Every listener has a statistics tree rooted at *listener.
.* with the fo downstream_cx_active, Gauge, Total active connections downstream_cx_length_ms, Histogram, Connection length milliseconds downstream_cx_overflow, Counter, Total connections rejected due to enforcement of listener connection limit + downstream_cx_overload_reject, Counter, Total connections rejected due to configured overload actions downstream_pre_cx_timeout, Counter, Sockets that timed out during listener filter processing downstream_pre_cx_active, Gauge, Sockets currently undergoing listener filter processing global_cx_overflow, Counter, Total connections rejected due to enforecement of the global connection limit diff --git a/docs/root/configuration/operations/overload_manager/overload_manager.rst b/docs/root/configuration/operations/overload_manager/overload_manager.rst index ade5201f5c4c..22e7e70b33b5 100644 --- a/docs/root/configuration/operations/overload_manager/overload_manager.rst +++ b/docs/root/configuration/operations/overload_manager/overload_manager.rst @@ -68,14 +68,27 @@ Overload actions The following overload actions are supported: -.. csv-table:: - :header: Name, Description +.. list-table:: + :header-rows: 1 :widths: 1, 2 - envoy.overload_actions.stop_accepting_requests, Envoy will immediately respond with a 503 response code to new requests - envoy.overload_actions.disable_http_keepalive, Envoy will stop accepting streams on incoming HTTP connections - envoy.overload_actions.stop_accepting_connections, Envoy will stop accepting new network connections on its configured listeners - envoy.overload_actions.shrink_heap, Envoy will periodically try to shrink the heap by releasing free memory to the system + * - Name + - Description + + * - envoy.overload_actions.stop_accepting_requests + - Envoy will immediately respond with a 503 response code to new requests + + * - envoy.overload_actions.disable_http_keepalive + - Envoy will stop accepting streams on incoming HTTP connections + + * - envoy.overload_actions.stop_accepting_connections + - Envoy will stop accepting new network connections on its configured listeners + + * - envoy.overload_actions.reject_incoming_connections + - Envoy will reject incoming connections on its configured listeners without processing any data + + * - envoy.overload_actions.shrink_heap + - Envoy will periodically try to shrink the heap by releasing free memory to the system Limiting Active Connections --------------------------- diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index 0c24a46dd201..6e5fc92acf08 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -27,6 +27,7 @@ New Features * grpc: implemented header value syntax support when defining :ref:`initial metadata ` for gRPC-based `ext_authz` :ref:`HTTP ` and :ref:`network ` filters, and :ref:`ratelimit ` filters. * health_check: added option to use :ref:`no_traffic_healthy_interval ` which allows a different no traffic interval when the host is healthy. * mongo_proxy: the list of commands to produce metrics for is now :ref:`configurable `. +* tcp: added a new :ref:`envoy.overload_actions.reject_incoming_connections ` action to reject incoming TCP connections. Deprecated ---------- diff --git a/include/envoy/network/connection_handler.h b/include/envoy/network/connection_handler.h index ea804eba5789..c42cc290cd61 100644 --- a/include/envoy/network/connection_handler.h +++ b/include/envoy/network/connection_handler.h @@ -94,6 +94,12 @@ class ConnectionHandler { */ virtual void enableListeners() PURE; + /** + * Set the fraction of connections the listeners should reject. + * @param reject_fraction a value between 0 (reject none) and 1 (reject all). + */ + virtual void setListenerRejectFraction(float reject_fraction) PURE; + /** * @return the stat prefix used for per-handler stats. */ diff --git a/include/envoy/network/listener.h b/include/envoy/network/listener.h index f74d6416103a..4401df6cc20c 100644 --- a/include/envoy/network/listener.h +++ b/include/envoy/network/listener.h @@ -197,10 +197,14 @@ class TcpListenerCallbacks { */ virtual void onAccept(ConnectionSocketPtr&& socket) PURE; + enum class RejectCause { + GlobalCxLimit, + OverloadAction, + }; /** * Called when a new connection is rejected. */ - virtual void onReject() PURE; + virtual void onReject(RejectCause cause) PURE; }; /** @@ -324,6 +328,12 @@ class Listener { * Enable accepting new connections. */ virtual void enable() PURE; + + /** + * Set the fraction of incoming connections that will be closed immediately + * after being opened. + */ + virtual void setRejectFraction(float reject_fraction) PURE; }; using ListenerPtr = std::unique_ptr; diff --git a/include/envoy/server/overload_manager.h b/include/envoy/server/overload_manager.h index 77c6c21c097d..8f5914fe34f3 100644 --- a/include/envoy/server/overload_manager.h +++ b/include/envoy/server/overload_manager.h @@ -62,6 +62,10 @@ class OverloadActionNameValues { // Overload action to stop accepting new connections. const std::string StopAcceptingConnections = "envoy.overload_actions.stop_accepting_connections"; + // Overload action to reject (accept and then close) new connections. + const std::string RejectIncomingConnections = + "envoy.overload_actions.reject_incoming_connections"; + // Overload action to try to shrink the heap by releasing free memory. const std::string ShrinkHeap = "envoy.overload_actions.shrink_heap"; }; diff --git a/source/common/event/dispatcher_impl.cc b/source/common/event/dispatcher_impl.cc index 612aef3b7c41..6cdbb623b720 100644 --- a/source/common/event/dispatcher_impl.cc +++ b/source/common/event/dispatcher_impl.cc @@ -162,8 +162,8 @@ Network::ListenerPtr DispatcherImpl::createListener(Network::SocketSharedPtr&& s Network::TcpListenerCallbacks& cb, bool bind_to_port, uint32_t backlog_size) { ASSERT(isThreadSafe()); - return std::make_unique(*this, std::move(socket), cb, bind_to_port, - backlog_size); + return std::make_unique( + *this, api_.randomGenerator(), std::move(socket), cb, bind_to_port, backlog_size); } Network::UdpListenerPtr DispatcherImpl::createUdpListener(Network::SocketSharedPtr socket, diff --git a/source/common/network/tcp_listener_impl.cc b/source/common/network/tcp_listener_impl.cc index 5c39ec692f89..91975d6ca5d1 100644 --- a/source/common/network/tcp_listener_impl.cc +++ b/source/common/network/tcp_listener_impl.cc @@ -62,7 +62,11 @@ void TcpListenerImpl::onSocketEvent(short flags) { if (rejectCxOverGlobalLimit()) { // The global connection limit has been reached. io_handle->close(); - cb_.onReject(); + cb_.onReject(TcpListenerCallbacks::RejectCause::GlobalCxLimit); + continue; + } else if (random_.bernoulli(reject_fraction_)) { + io_handle->close(); + cb_.onReject(TcpListenerCallbacks::RejectCause::OverloadAction); continue; } @@ -106,9 +110,11 @@ void TcpListenerImpl::setupServerSocket(Event::DispatcherImpl& dispatcher, Socke } } -TcpListenerImpl::TcpListenerImpl(Event::DispatcherImpl& dispatcher, SocketSharedPtr socket, - TcpListenerCallbacks& cb, bool bind_to_port, uint32_t backlog_size) - : BaseListenerImpl(dispatcher, std::move(socket)), cb_(cb), backlog_size_(backlog_size) { +TcpListenerImpl::TcpListenerImpl(Event::DispatcherImpl& dispatcher, Random::RandomGenerator& random, + SocketSharedPtr socket, TcpListenerCallbacks& cb, + bool bind_to_port, uint32_t backlog_size) + : BaseListenerImpl(dispatcher, std::move(socket)), cb_(cb), backlog_size_(backlog_size), + random_(random), reject_fraction_(0.0) { if (bind_to_port) { setupServerSocket(dispatcher, *socket_); } @@ -118,5 +124,10 @@ void TcpListenerImpl::enable() { file_event_->setEnabled(Event::FileReadyType::R void TcpListenerImpl::disable() { file_event_->setEnabled(0); } +void TcpListenerImpl::setRejectFraction(const float reject_fraction) { + ASSERT(0 <= reject_fraction && reject_fraction <= 1); + reject_fraction_ = reject_fraction; +} + } // namespace Network } // namespace Envoy diff --git a/source/common/network/tcp_listener_impl.h b/source/common/network/tcp_listener_impl.h index 56b6725b36c9..5ecec192abf2 100644 --- a/source/common/network/tcp_listener_impl.h +++ b/source/common/network/tcp_listener_impl.h @@ -1,5 +1,6 @@ #pragma once +#include "envoy/common/random_generator.h" #include "envoy/runtime/runtime.h" #include "absl/strings/string_view.h" @@ -13,10 +14,12 @@ namespace Network { */ class TcpListenerImpl : public BaseListenerImpl { public: - TcpListenerImpl(Event::DispatcherImpl& dispatcher, SocketSharedPtr socket, - TcpListenerCallbacks& cb, bool bind_to_port, uint32_t backlog_size); + TcpListenerImpl(Event::DispatcherImpl& dispatcher, Random::RandomGenerator& random, + SocketSharedPtr socket, TcpListenerCallbacks& cb, bool bind_to_port, + uint32_t backlog_size); void disable() override; void enable() override; + void setRejectFraction(float reject_fraction) override; static const absl::string_view GlobalMaxCxRuntimeKey; @@ -33,7 +36,9 @@ class TcpListenerImpl : public BaseListenerImpl { // rejected/closed. If the accepted socket is to be admitted, false is returned. static bool rejectCxOverGlobalLimit(); + Random::RandomGenerator& random_; Event::FileEventPtr file_event_; + float reject_fraction_; }; } // namespace Network diff --git a/source/common/network/udp_listener_impl.h b/source/common/network/udp_listener_impl.h index e857a0150f25..d555649833bc 100644 --- a/source/common/network/udp_listener_impl.h +++ b/source/common/network/udp_listener_impl.h @@ -30,6 +30,7 @@ class UdpListenerImpl : public BaseListenerImpl, // Network::Listener Interface void disable() override; void enable() override; + void setRejectFraction(float) override {} // Network::UdpListener Interface Event::Dispatcher& dispatcher() override; diff --git a/source/server/connection_handler_impl.cc b/source/server/connection_handler_impl.cc index fd5586fb78cd..f819f5843c7b 100644 --- a/source/server/connection_handler_impl.cc +++ b/source/server/connection_handler_impl.cc @@ -66,6 +66,9 @@ void ConnectionHandlerImpl::addListener(absl::optional overridden_list if (disable_listeners_) { details.listener_->pauseListening(); } + if (auto* listener = details.listener_->listener(); listener != nullptr) { + listener->setRejectFraction(listener_reject_fraction_); + } listeners_.emplace_back(config.listenSocketFactory().localAddress(), std::move(details)); } @@ -148,6 +151,13 @@ void ConnectionHandlerImpl::enableListeners() { } } +void ConnectionHandlerImpl::setListenerRejectFraction(float reject_fraction) { + listener_reject_fraction_ = reject_fraction; + for (auto& listener : listeners_) { + listener.second.listener_->listener()->setRejectFraction(reject_fraction); + } +} + void ConnectionHandlerImpl::ActiveTcpListener::removeConnection(ActiveTcpConnection& connection) { ENVOY_CONN_LOG(debug, "adding to cleanup list", *connection.connection_); ActiveConnections& active_connections = connection.active_connections_; @@ -391,6 +401,17 @@ void ConnectionHandlerImpl::ActiveTcpListener::onAccept(Network::ConnectionSocke onAcceptWorker(std::move(socket), config_->handOffRestoredDestinationConnections(), false); } +void ConnectionHandlerImpl::ActiveTcpListener::onReject(RejectCause cause) { + switch (cause) { + case RejectCause::GlobalCxLimit: + stats_.downstream_global_cx_overflow_.inc(); + break; + case RejectCause::OverloadAction: + stats_.downstream_cx_overload_reject_.inc(); + break; + } +} + void ConnectionHandlerImpl::ActiveTcpListener::onAcceptWorker( Network::ConnectionSocketPtr&& socket, bool hand_off_restored_destination_connections, bool rebalanced) { diff --git a/source/server/connection_handler_impl.h b/source/server/connection_handler_impl.h index 62ddf0c11be7..66d317e8f10e 100644 --- a/source/server/connection_handler_impl.h +++ b/source/server/connection_handler_impl.h @@ -30,6 +30,7 @@ namespace Server { COUNTER(downstream_cx_destroy) \ COUNTER(downstream_cx_overflow) \ COUNTER(downstream_cx_total) \ + COUNTER(downstream_cx_overload_reject) \ COUNTER(downstream_global_cx_overflow) \ COUNTER(downstream_pre_cx_timeout) \ COUNTER(no_filter_chain_match) \ @@ -82,6 +83,7 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler, void stopListeners() override; void disableListeners() override; void enableListeners() override; + void setListenerRejectFraction(float reject_fraction) override; const std::string& statPrefix() const override { return per_handler_stat_prefix_; } /** @@ -133,7 +135,7 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler, // Network::TcpListenerCallbacks void onAccept(Network::ConnectionSocketPtr&& socket) override; - void onReject() override { stats_.downstream_global_cx_overflow_.inc(); } + void onReject(RejectCause) override; // ActiveListenerImplBase Network::Listener* listener() override { return listener_.get(); } @@ -361,6 +363,7 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler, std::list> listeners_; std::atomic num_handler_connections_{}; bool disable_listeners_; + float listener_reject_fraction_{0}; }; class ActiveUdpListenerBase : public ConnectionHandlerImpl::ActiveListenerImplBase, diff --git a/source/server/worker_impl.cc b/source/server/worker_impl.cc index 1fd18f106618..b659ffec6e06 100644 --- a/source/server/worker_impl.cc +++ b/source/server/worker_impl.cc @@ -31,6 +31,9 @@ WorkerImpl::WorkerImpl(ThreadLocal::Instance& tls, ListenerHooks& hooks, overload_manager.registerForAction( OverloadActionNames::get().StopAcceptingConnections, *dispatcher_, [this](OverloadActionState state) { stopAcceptingConnectionsCb(state); }); + overload_manager.registerForAction( + OverloadActionNames::get().RejectIncomingConnections, *dispatcher_, + [this](OverloadActionState state) { rejectIncomingConnectionsCb(state); }); } void WorkerImpl::addListener(absl::optional overridden_listener, @@ -149,5 +152,9 @@ void WorkerImpl::stopAcceptingConnectionsCb(OverloadActionState state) { } } +void WorkerImpl::rejectIncomingConnectionsCb(OverloadActionState state) { + handler_->setListenerRejectFraction(static_cast(state.value())); +} + } // namespace Server } // namespace Envoy diff --git a/source/server/worker_impl.h b/source/server/worker_impl.h index c4cb4a58c2b5..22513b594e5d 100644 --- a/source/server/worker_impl.h +++ b/source/server/worker_impl.h @@ -58,6 +58,7 @@ class WorkerImpl : public Worker, Logger::Loggable { private: void threadRoutine(GuardDog& guard_dog); void stopAcceptingConnectionsCb(OverloadActionState state); + void rejectIncomingConnectionsCb(OverloadActionState state); ThreadLocal::Instance& tls_; ListenerHooks& hooks_; diff --git a/test/common/network/dns_impl_test.cc b/test/common/network/dns_impl_test.cc index ea83c16c21e7..339cfb4abc89 100644 --- a/test/common/network/dns_impl_test.cc +++ b/test/common/network/dns_impl_test.cc @@ -281,7 +281,7 @@ class TestDnsServer : public TcpListenerCallbacks { queries_.emplace_back(query); } - void onReject() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } + void onReject(RejectCause) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } void addHosts(const std::string& hostname, const IpList& ip, const RecordType& type) { if (type == RecordType::A) { diff --git a/test/common/network/listener_impl_test.cc b/test/common/network/listener_impl_test.cc index 8056826de91a..21e4673ce643 100644 --- a/test/common/network/listener_impl_test.cc +++ b/test/common/network/listener_impl_test.cc @@ -1,3 +1,5 @@ +#include + #include "envoy/config/core/v3/base.pb.h" #include "envoy/network/exception.h" @@ -65,10 +67,11 @@ TEST_P(ListenerImplDeathTest, ErrorCallback) { class TestTcpListenerImpl : public TcpListenerImpl { public: - TestTcpListenerImpl(Event::DispatcherImpl& dispatcher, SocketSharedPtr socket, - TcpListenerCallbacks& cb, bool bind_to_port, + TestTcpListenerImpl(Event::DispatcherImpl& dispatcher, Random::RandomGenerator& random_generator, + SocketSharedPtr socket, TcpListenerCallbacks& cb, bool bind_to_port, uint32_t tcp_backlog = ENVOY_TCP_BACKLOG_SIZE) - : TcpListenerImpl(dispatcher, std::move(socket), cb, bind_to_port, tcp_backlog) {} + : TcpListenerImpl(dispatcher, random_generator, std::move(socket), cb, bind_to_port, + tcp_backlog) {} MOCK_METHOD(Address::InstanceConstSharedPtr, getLocalAddress, (os_fd_t fd)); }; @@ -82,6 +85,7 @@ INSTANTIATE_TEST_SUITE_P(IpVersions, TcpListenerImplTest, TEST_P(TcpListenerImplTest, SetListeningSocketOptionsSuccess) { Network::MockTcpListenerCallbacks listener_callbacks; Network::MockConnectionHandler connection_handler; + Random::MockRandomGenerator random_generator; auto socket = std::make_shared( Network::Test::getCanonicalLoopbackAddress(version_), nullptr, true); @@ -89,13 +93,15 @@ TEST_P(TcpListenerImplTest, SetListeningSocketOptionsSuccess) { socket->addOption(option); EXPECT_CALL(*option, setOption(_, envoy::config::core::v3::SocketOption::STATE_LISTENING)) .WillOnce(Return(true)); - TestTcpListenerImpl listener(dispatcherImpl(), socket, listener_callbacks, true); + TestTcpListenerImpl listener(dispatcherImpl(), random_generator, socket, listener_callbacks, + true); } // Test that an exception is thrown if there is an error setting socket options. TEST_P(TcpListenerImplTest, SetListeningSocketOptionsError) { Network::MockTcpListenerCallbacks listener_callbacks; Network::MockConnectionHandler connection_handler; + Random::MockRandomGenerator random_generator; auto socket = std::make_shared( Network::Test::getCanonicalLoopbackAddress(version_), nullptr, true); @@ -103,10 +109,11 @@ TEST_P(TcpListenerImplTest, SetListeningSocketOptionsError) { socket->addOption(option); EXPECT_CALL(*option, setOption(_, envoy::config::core::v3::SocketOption::STATE_LISTENING)) .WillOnce(Return(false)); - EXPECT_THROW_WITH_MESSAGE(TestTcpListenerImpl(dispatcherImpl(), socket, listener_callbacks, true), - CreateListenerException, - fmt::format("cannot set post-listen socket option on socket: {}", - socket->localAddress()->asString())); + EXPECT_THROW_WITH_MESSAGE( + TestTcpListenerImpl(dispatcherImpl(), random_generator, socket, listener_callbacks, true), + CreateListenerException, + fmt::format("cannot set post-listen socket option on socket: {}", + socket->localAddress()->asString())); } TEST_P(TcpListenerImplTest, UseActualDst) { @@ -115,10 +122,13 @@ TEST_P(TcpListenerImplTest, UseActualDst) { auto socketDst = std::make_shared(alt_address_, nullptr, false); Network::MockTcpListenerCallbacks listener_callbacks1; Network::MockConnectionHandler connection_handler; + Random::MockRandomGenerator random_generator; // Do not redirect since use_original_dst is false. - Network::TestTcpListenerImpl listener(dispatcherImpl(), socket, listener_callbacks1, true); + Network::TestTcpListenerImpl listener(dispatcherImpl(), random_generator, socket, + listener_callbacks1, true); Network::MockTcpListenerCallbacks listener_callbacks2; - Network::TestTcpListenerImpl listenerDst(dispatcherImpl(), socketDst, listener_callbacks2, false); + Network::TestTcpListenerImpl listenerDst(dispatcherImpl(), random_generator, socketDst, + listener_callbacks2, false); Network::ClientConnectionPtr client_connection = dispatcher_->createClientConnection( socket->localAddress(), Network::Address::InstanceConstSharedPtr(), @@ -175,7 +185,8 @@ TEST_P(TcpListenerImplTest, GlobalConnectionLimitEnforcement) { }; initiate_connections(5); - EXPECT_CALL(listener_callbacks, onReject()).Times(3); + EXPECT_CALL(listener_callbacks, onReject(TcpListenerCallbacks::RejectCause::GlobalCxLimit)) + .Times(3); dispatcher_->run(Event::Dispatcher::RunType::Block); // We expect any server-side connections that get created to populate 'server_connections'. @@ -185,7 +196,8 @@ TEST_P(TcpListenerImplTest, GlobalConnectionLimitEnforcement) { Runtime::LoaderSingleton::getExisting()->mergeValues( {{"overload.global_downstream_max_connections", "3"}}); initiate_connections(5); - EXPECT_CALL(listener_callbacks, onReject()).Times(4); + EXPECT_CALL(listener_callbacks, onReject(TcpListenerCallbacks::RejectCause::GlobalCxLimit)) + .Times(4); dispatcher_->run(Event::Dispatcher::RunType::Block); EXPECT_EQ(3, server_connections.size()); @@ -214,8 +226,10 @@ TEST_P(TcpListenerImplTest, WildcardListenerUseActualDst) { Network::Test::getCanonicalLoopbackAddress(version_), nullptr, true); Network::MockTcpListenerCallbacks listener_callbacks; Network::MockConnectionHandler connection_handler; + Random::MockRandomGenerator random_generator; // Do not redirect since use_original_dst is false. - Network::TestTcpListenerImpl listener(dispatcherImpl(), socket, listener_callbacks, true); + Network::TestTcpListenerImpl listener(dispatcherImpl(), random_generator, socket, + listener_callbacks, true); auto local_dst_address = Network::Utility::getAddressWithPort( *Network::Test::getCanonicalLoopbackAddress(version_), socket->localAddress()->ip()->port()); @@ -253,11 +267,13 @@ TEST_P(TcpListenerImplTest, WildcardListenerIpv4Compat) { options, true); Network::MockTcpListenerCallbacks listener_callbacks; Network::MockConnectionHandler connection_handler; + Random::MockRandomGenerator random_generator; ASSERT_TRUE(socket->localAddress()->ip()->isAnyAddress()); // Do not redirect since use_original_dst is false. - Network::TestTcpListenerImpl listener(dispatcherImpl(), socket, listener_callbacks, true); + Network::TestTcpListenerImpl listener(dispatcherImpl(), random_generator, socket, + listener_callbacks, true); auto listener_address = Network::Utility::getAddressWithPort( *Network::Test::getCanonicalLoopbackAddress(version_), socket->localAddress()->ip()->port()); @@ -291,7 +307,9 @@ TEST_P(TcpListenerImplTest, DisableAndEnableListener) { Network::Test::getCanonicalLoopbackAddress(version_), nullptr, true); MockTcpListenerCallbacks listener_callbacks; MockConnectionCallbacks connection_callbacks; - TestTcpListenerImpl listener(dispatcherImpl(), socket, listener_callbacks, true); + Random::MockRandomGenerator random_generator; + TestTcpListenerImpl listener(dispatcherImpl(), random_generator, socket, listener_callbacks, + true); // When listener is disabled, the timer should fire before any connection is accepted. listener.disable(); @@ -325,6 +343,132 @@ TEST_P(TcpListenerImplTest, DisableAndEnableListener) { dispatcher_->run(Event::Dispatcher::RunType::Block); } +TEST_P(TcpListenerImplTest, SetListenerRejectFractionZero) { + auto socket = std::make_shared( + Network::Test::getCanonicalLoopbackAddress(version_), nullptr, true); + MockTcpListenerCallbacks listener_callbacks; + MockConnectionCallbacks connection_callbacks; + Random::MockRandomGenerator random_generator; + TestTcpListenerImpl listener(dispatcherImpl(), random_generator, socket, listener_callbacks, + true); + + listener.setRejectFraction(0); + + // This connection will be accepted and not rejected. + { + testing::InSequence s1; + EXPECT_CALL(connection_callbacks, onEvent(ConnectionEvent::Connected)); + EXPECT_CALL(connection_callbacks, onEvent(ConnectionEvent::LocalClose)); + } + EXPECT_CALL(listener_callbacks, onAccept_(_)).WillOnce([&] { dispatcher_->exit(); }); + + ClientConnectionPtr client_connection = + dispatcher_->createClientConnection(socket->localAddress(), Address::InstanceConstSharedPtr(), + Network::Test::createRawBufferSocket(), nullptr); + client_connection->addConnectionCallbacks(connection_callbacks); + client_connection->connect(); + dispatcher_->run(Event::Dispatcher::RunType::Block); + + // Now that we've seen that the connection hasn't been closed by the listener, make sure to close + // it. + client_connection->close(ConnectionCloseType::NoFlush); +} + +TEST_P(TcpListenerImplTest, SetListenerRejectFractionIntermediate) { + auto socket = std::make_shared( + Network::Test::getCanonicalLoopbackAddress(version_), nullptr, true); + MockTcpListenerCallbacks listener_callbacks; + MockConnectionCallbacks connection_callbacks; + Random::MockRandomGenerator random_generator; + TestTcpListenerImpl listener(dispatcherImpl(), random_generator, socket, listener_callbacks, + true); + + listener.setRejectFraction(0.5f); + + // The first connection will be rejected because the random value is too small. + { + testing::InSequence s1; + EXPECT_CALL(random_generator, random()).WillOnce(Return(0)); + EXPECT_CALL(listener_callbacks, onReject(TcpListenerCallbacks::RejectCause::OverloadAction)); + } + { + testing::InSequence s2; + EXPECT_CALL(connection_callbacks, onEvent(ConnectionEvent::Connected)); + EXPECT_CALL(connection_callbacks, onEvent(ConnectionEvent::RemoteClose)).WillOnce([&] { + dispatcher_->exit(); + }); + } + + { + ClientConnectionPtr client_connection = dispatcher_->createClientConnection( + socket->localAddress(), Address::InstanceConstSharedPtr(), + Network::Test::createRawBufferSocket(), nullptr); + client_connection->addConnectionCallbacks(connection_callbacks); + client_connection->connect(); + dispatcher_->run(Event::Dispatcher::RunType::Block); + } + + // The second connection rolls better on initiative and is accepted. + { + testing::InSequence s1; + EXPECT_CALL(random_generator, random()).WillOnce(Return(std::numeric_limits::max())); + EXPECT_CALL(listener_callbacks, onAccept_(_)); + } + { + testing::InSequence s2; + EXPECT_CALL(connection_callbacks, onEvent(ConnectionEvent::Connected)).WillOnce([&] { + dispatcher_->exit(); + }); + EXPECT_CALL(connection_callbacks, onEvent(ConnectionEvent::RemoteClose)).Times(0); + } + + { + ClientConnectionPtr client_connection = dispatcher_->createClientConnection( + socket->localAddress(), Address::InstanceConstSharedPtr(), + Network::Test::createRawBufferSocket(), nullptr); + client_connection->addConnectionCallbacks(connection_callbacks); + client_connection->connect(); + dispatcher_->run(Event::Dispatcher::RunType::Block); + + EXPECT_CALL(connection_callbacks, onEvent(ConnectionEvent::LocalClose)); + // Now that we've seen that the connection hasn't been closed by the listener, make sure to + // close it. + client_connection->close(ConnectionCloseType::NoFlush); + } +} + +TEST_P(TcpListenerImplTest, SetListenerRejectFractionAll) { + auto socket = std::make_shared( + Network::Test::getCanonicalLoopbackAddress(version_), nullptr, true); + MockTcpListenerCallbacks listener_callbacks; + MockConnectionCallbacks connection_callbacks; + Random::MockRandomGenerator random_generator; + TestTcpListenerImpl listener(dispatcherImpl(), random_generator, socket, listener_callbacks, + true); + + listener.setRejectFraction(1); + + { + testing::InSequence s1; + EXPECT_CALL(listener_callbacks, onReject(TcpListenerCallbacks::RejectCause::OverloadAction)); + } + + { + testing::InSequence s2; + EXPECT_CALL(connection_callbacks, onEvent(ConnectionEvent::Connected)); + EXPECT_CALL(connection_callbacks, onEvent(ConnectionEvent::RemoteClose)).WillOnce([&] { + dispatcher_->exit(); + }); + } + + ClientConnectionPtr client_connection = + dispatcher_->createClientConnection(socket->localAddress(), Address::InstanceConstSharedPtr(), + Network::Test::createRawBufferSocket(), nullptr); + client_connection->addConnectionCallbacks(connection_callbacks); + client_connection->connect(); + dispatcher_->run(Event::Dispatcher::RunType::Block); +} + } // namespace } // namespace Network } // namespace Envoy diff --git a/test/mocks/network/mocks.h b/test/mocks/network/mocks.h index 304de415e7d3..de7b843a72d0 100644 --- a/test/mocks/network/mocks.h +++ b/test/mocks/network/mocks.h @@ -131,7 +131,7 @@ class MockTcpListenerCallbacks : public TcpListenerCallbacks { void onAccept(ConnectionSocketPtr&& socket) override { onAccept_(socket); } MOCK_METHOD(void, onAccept_, (ConnectionSocketPtr & socket)); - MOCK_METHOD(void, onReject, ()); + MOCK_METHOD(void, onReject, (RejectCause), (override)); }; class MockUdpListenerCallbacks : public UdpListenerCallbacks { @@ -391,6 +391,7 @@ class MockListener : public Listener { MOCK_METHOD(void, onDestroy, ()); MOCK_METHOD(void, enable, ()); MOCK_METHOD(void, disable, ()); + MOCK_METHOD(void, setRejectFraction, (float)); }; class MockConnectionHandler : public ConnectionHandler { @@ -412,6 +413,7 @@ class MockConnectionHandler : public ConnectionHandler { MOCK_METHOD(void, stopListeners, ()); MOCK_METHOD(void, disableListeners, ()); MOCK_METHOD(void, enableListeners, ()); + MOCK_METHOD(void, setListenerRejectFraction, (float), (override)); MOCK_METHOD(const std::string&, statPrefix, (), (const)); }; @@ -501,6 +503,7 @@ class MockUdpListener : public UdpListener { MOCK_METHOD(void, onDestroy, ()); MOCK_METHOD(void, enable, ()); MOCK_METHOD(void, disable, ()); + MOCK_METHOD(void, setRejectFraction, (float), (override)); MOCK_METHOD(Event::Dispatcher&, dispatcher, ()); MOCK_METHOD(Address::InstanceConstSharedPtr&, localAddress, (), (const)); MOCK_METHOD(Api::IoCallUint64Result, send, (const UdpSendData&)); diff --git a/test/server/connection_handler_test.cc b/test/server/connection_handler_test.cc index dbb6a5698e43..b286dc588dd2 100644 --- a/test/server/connection_handler_test.cc +++ b/test/server/connection_handler_test.cc @@ -176,6 +176,7 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::LoggableaddListener(absl::nullopt, *test_listener); } -TEST_F(ConnectionHandlerTest, DisableListenerAfterStop) { +TEST_F(ConnectionHandlerTest, SetListenerRejectFraction) { InSequence s; Network::TcpListenerCallbacks* listener_callbacks; @@ -461,10 +462,25 @@ TEST_F(ConnectionHandlerTest, DisableListenerAfterStop) { EXPECT_CALL(*socket_factory_, localAddress()).WillOnce(ReturnRef(local_address_)); handler_->addListener(absl::nullopt, *test_listener); + EXPECT_CALL(*listener, setRejectFraction(0.1234f)); EXPECT_CALL(*listener, onDestroy()); - handler_->stopListeners(); - handler_->disableListeners(); + handler_->setListenerRejectFraction(0.1234f); +} + +TEST_F(ConnectionHandlerTest, AddListenerSetRejectFraction) { + InSequence s; + + Network::TcpListenerCallbacks* listener_callbacks; + auto listener = new NiceMock(); + TestListener* test_listener = + addListener(1, false, false, "test_listener", listener, &listener_callbacks); + EXPECT_CALL(*listener, setRejectFraction(0.12345f)); + EXPECT_CALL(*socket_factory_, localAddress()).WillOnce(ReturnRef(local_address_)); + EXPECT_CALL(*listener, onDestroy()); + + handler_->setListenerRejectFraction(0.12345f); + handler_->addListener(absl::nullopt, *test_listener); } TEST_F(ConnectionHandlerTest, DestroyCloseConnections) { @@ -1096,6 +1112,36 @@ TEST_F(ConnectionHandlerTest, TcpListenerRemoveFilterChain) { handler_.reset(); } +TEST_F(ConnectionHandlerTest, TcpListenerGlobalCxLimitReject) { + Network::TcpListenerCallbacks* listener_callbacks; + auto listener = new NiceMock(); + TestListener* test_listener = + addListener(1, true, false, "test_listener", listener, &listener_callbacks); + EXPECT_CALL(*socket_factory_, localAddress()).WillOnce(ReturnRef(local_address_)); + handler_->addListener(absl::nullopt, *test_listener); + + listener_callbacks->onReject(Network::TcpListenerCallbacks::RejectCause::GlobalCxLimit); + + EXPECT_EQ(1UL, TestUtility::findCounter(stats_store_, "downstream_global_cx_overflow")->value()); + EXPECT_EQ(0UL, TestUtility::findCounter(stats_store_, "downstream_cx_overload_reject")->value()); + EXPECT_CALL(*listener, onDestroy()); +} + +TEST_F(ConnectionHandlerTest, TcpListenerOverloadActionReject) { + Network::TcpListenerCallbacks* listener_callbacks; + auto listener = new NiceMock(); + TestListener* test_listener = + addListener(1, true, false, "test_listener", listener, &listener_callbacks); + EXPECT_CALL(*socket_factory_, localAddress()).WillOnce(ReturnRef(local_address_)); + handler_->addListener(absl::nullopt, *test_listener); + + listener_callbacks->onReject(Network::TcpListenerCallbacks::RejectCause::OverloadAction); + + EXPECT_EQ(1UL, TestUtility::findCounter(stats_store_, "downstream_cx_overload_reject")->value()); + EXPECT_EQ(0UL, TestUtility::findCounter(stats_store_, "downstream_global_cx_overflow")->value()); + EXPECT_CALL(*listener, onDestroy()); +} + // Listener Filter matchers works. TEST_F(ConnectionHandlerTest, ListenerFilterWorks) { Network::TcpListenerCallbacks* listener_callbacks;