Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make grpcmux pause/resume come in pair by returning a RAII obj which resumes requests on destruction. #11739

Merged
merged 21 commits into from
Jul 9, 2020
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
a9b9f8b
doc: fix SNI FAQ link (#10227)
lizan Mar 2, 2020
ff341df
Merge branch 'master' of https://github.com/envoyproxy/envoy
stevenzzzz Mar 5, 2020
963e42e
Merge branch 'master' of https://github.com/envoyproxy/envoy
stevenzzzz Mar 12, 2020
60953f8
Merge branch 'master' of https://github.com/envoyproxy/envoy
stevenzzzz Apr 18, 2020
8991be3
Merge branch 'master' of https://github.com/envoyproxy/envoy
stevenzzzz May 27, 2020
9ed6dee
Merge branch 'master' of https://github.com/envoyproxy/envoy
stevenzzzz Jun 24, 2020
92dd072
make api pause/resume in pair by returning a RAII obj which resumes r…
stevenzzzz Jun 24, 2020
79eb46a
cancel resume_cds_ on shutdown of clustermanagerImpl
stevenzzzz Jun 24, 2020
d0ce022
format errors fix
stevenzzzz Jun 24, 2020
bf897cb
move the ABSL_MUST_USE_RESULT attr to the beginning to see if gcc wil…
stevenzzzz Jun 25, 2020
c51ef88
pass by reference type_urls into cleanup in scoped rds
stevenzzzz Jun 26, 2020
63476f8
review fixes per Harvey comments.
stevenzzzz Jun 29, 2020
3944e77
Merge branch 'master' of https://github.com/envoyproxy/envoy
stevenzzzz Jun 29, 2020
49d4e1d
Merge branch 'master' into raii-pause
stevenzzzz Jun 29, 2020
1fcc16e
remove unused parameter, weird it wasnt caught previously
stevenzzzz Jun 29, 2020
dbbc4a0
Merge branch 'master' of https://github.com/envoyproxy/envoy
stevenzzzz Jul 6, 2020
dd8972f
Merge branch 'master' of https://github.com/envoyproxy/envoy into master
stevenzzzz Jul 8, 2020
6e0a725
merge upstream master
stevenzzzz Jul 8, 2020
60c22ec
fix-format using clang 10
stevenzzzz Jul 9, 2020
34408d9
Merge branch 'master' of https://github.com/envoyproxy/envoy into master
stevenzzzz Jul 9, 2020
10e67b7
Merge branch 'master' into raii-pause
stevenzzzz Jul 9, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/envoy/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ envoy_cc_library(
deps = [
":subscription_interface",
"//include/envoy/stats:stats_macros",
"//source/common/common:cleanup_lib",
"//source/common/protobuf",
],
)
Expand Down
27 changes: 10 additions & 17 deletions include/envoy/config/grpc_mux.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
#include "envoy/config/subscription.h"
#include "envoy/stats/stats_macros.h"

#include "common/common/cleanup.h"
#include "common/protobuf/protobuf.h"

namespace Envoy {
namespace Config {

using ScopedResume = std::unique_ptr<Cleanup>;
/**
* All control plane related stats. @see stats_macros.h
*/
Expand Down Expand Up @@ -62,32 +64,23 @@ class GrpcMux {
* requests may later be resumed with resume().
* @param type_url type URL corresponding to xDS API, e.g.
* type.googleapis.com/envoy.api.v2.Cluster.
*
* @return a ScopedResume object, which when destructed, resumes the paused discovery requests.
* A discovery request will be sent if one would have been sent during the pause.
*/
virtual void pause(const std::string& type_url) PURE;
ABSL_MUST_USE_RESULT virtual ScopedResume pause(const std::string& type_url) PURE;

/**
* Pause discovery requests for given API types. This is useful when we're processing an update
* for LDS or CDS and don't want a flood of updates for RDS or EDS respectively. Discovery
* requests may later be resumed with resume().
* @param type_urls type URLs corresponding to xDS API, e.g.
* type.googleapis.com/envoy.api.v2.Cluster.
*
* @return a ScopedResume object, which when destructed, resumes the paused discovery requests.
* A discovery request will be sent if one would have been sent during the pause.
*/
virtual void pause(const std::vector<std::string> type_urls) PURE;

/**
* Resume discovery requests for a given API type. This will send a discovery request if one would
* have been sent during the pause.
* @param type_url type URL corresponding to xDS API e.g. type.googleapis.com/envoy.api.v2.Cluster
*/
virtual void resume(const std::string& type_url) PURE;

/**
* Resume discovery requests for given API types. This will send a discovery request if one would
* have been sent during the pause.
* @param type_urls type URLs corresponding to xDS API e.g.
* type.googleapis.com/envoy.api.v2.Cluster
*/
virtual void resume(const std::vector<std::string> type_urls) PURE;
ABSL_MUST_USE_RESULT virtual ScopedResume pause(const std::vector<std::string> type_urls) PURE;

/**
* Retrieves the current pause state as set by pause()/resume().
Expand Down
6 changes: 3 additions & 3 deletions source/common/common/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ class CondVar {
* @return WaitStatus whether the condition timed out or not.
*/
template <class Rep, class Period>
WaitStatus waitFor(MutexBasicLockable& mutex,
std::chrono::duration<Rep, Period> duration) noexcept
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex) {
WaitStatus waitFor(
MutexBasicLockable& mutex,
std::chrono::duration<Rep, Period> duration) noexcept ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex) {
return condvar_.WaitWithTimeout(&mutex.mutex_, absl::FromChrono(duration))
? WaitStatus::Timeout
: WaitStatus::NoTimeout;
Expand Down
1 change: 1 addition & 0 deletions source/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ envoy_cc_library(
"//include/envoy/config:grpc_mux_interface",
"//include/envoy/config:subscription_interface",
"//include/envoy/upstream:cluster_manager_interface",
"//source/common/common:cleanup_lib",
"//source/common/common:minimal_logger_lib",
"//source/common/common:utility_lib",
"//source/common/memory:utils_lib",
Expand Down
47 changes: 21 additions & 26 deletions source/common/config/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,37 +99,32 @@ GrpcMuxWatchPtr GrpcMuxImpl::addWatch(const std::string& type_url,
return watch;
}

void GrpcMuxImpl::pause(const std::string& type_url) {
ENVOY_LOG(debug, "Pausing discovery requests for {}", type_url);
ApiState& api_state = api_state_[type_url];
ASSERT(!api_state.paused_);
ASSERT(!api_state.pending_);
api_state.paused_ = true;
ScopedResume GrpcMuxImpl::pause(const std::string& type_url) {
return pause(std::vector<std::string>{type_url});
}

void GrpcMuxImpl::pause(const std::vector<std::string> type_urls) {
ScopedResume GrpcMuxImpl::pause(const std::vector<std::string> type_urls) {
for (const auto& type_url : type_urls) {
pause(type_url);
}
}

void GrpcMuxImpl::resume(const std::string& type_url) {
ENVOY_LOG(debug, "Resuming discovery requests for {}", type_url);
ApiState& api_state = api_state_[type_url];
ASSERT(api_state.paused_);
api_state.paused_ = false;

if (api_state.pending_) {
ASSERT(api_state.subscribed_);
queueDiscoveryRequest(type_url);
api_state.pending_ = false;
ENVOY_LOG(debug, "Pausing discovery requests for {}", type_url);
ApiState& api_state = api_state_[type_url];
ASSERT(!api_state.paused_);
ASSERT(!api_state.pending_);
htuch marked this conversation as resolved.
Show resolved Hide resolved
api_state.paused_ = true;
}
}
return std::make_unique<Cleanup>([this, type_urls]() {
for (const auto& type_url : type_urls) {
ENVOY_LOG(debug, "Resuming discovery requests for {}", type_url);
ApiState& api_state = api_state_[type_url];
ASSERT(api_state.paused_);
api_state.paused_ = false;

void GrpcMuxImpl::resume(const std::vector<std::string> type_urls) {
for (const auto& type_url : type_urls) {
resume(type_url);
}
if (api_state.pending_) {
ASSERT(api_state.subscribed_);
queueDiscoveryRequest(type_url);
api_state.pending_ = false;
}
}
});
}

bool GrpcMuxImpl::paused(const std::string& type_url) const {
Expand Down
17 changes: 8 additions & 9 deletions source/common/config/grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

namespace Envoy {
namespace Config {

/**
* ADS API implementation that fetches via gRPC.
*/
Expand All @@ -38,10 +37,8 @@ class GrpcMuxImpl : public GrpcMux,
void start() override;

// GrpcMux
void pause(const std::string& type_url) override;
void pause(const std::vector<std::string> type_urls) override;
void resume(const std::string& type_url) override;
void resume(const std::vector<std::string> type_urls) override;
ScopedResume pause(const std::string& type_url) override;
ScopedResume pause(const std::vector<std::string> type_urls) override;
bool paused(const std::string& type_url) const override;
bool paused(const std::vector<std::string> type_urls) const override;

Expand Down Expand Up @@ -147,10 +144,12 @@ class NullGrpcMuxImpl : public GrpcMux,
GrpcStreamCallbacks<envoy::service::discovery::v3::DiscoveryResponse> {
public:
void start() override {}
void pause(const std::string&) override {}
void pause(const std::vector<std::string>) override {}
void resume(const std::string&) override {}
void resume(const std::vector<std::string>) override {}
ScopedResume pause(const std::string&) override {
return std::make_unique<Cleanup>([] {});
}
ScopedResume pause(const std::vector<std::string>) override {
return std::make_unique<Cleanup>([] {});
}
bool paused(const std::string&) const override { return false; }
bool paused(const std::vector<std::string>) const override { return false; }

Expand Down
4 changes: 1 addition & 3 deletions source/common/config/grpc_subscription_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,7 @@ void GrpcSubscriptionImpl::onConfigUpdateFailed(ConfigUpdateFailureReason reason
stats_.update_attempt_.inc();
}

void GrpcSubscriptionImpl::pause() { grpc_mux_->pause(type_url_); }

void GrpcSubscriptionImpl::resume() { grpc_mux_->resume(type_url_); }
ScopedResume GrpcSubscriptionImpl::pause() { return grpc_mux_->pause(type_url_); }

void GrpcSubscriptionImpl::disableInitFetchTimeoutTimer() {
if (init_fetch_timeout_timer_) {
Expand Down
3 changes: 1 addition & 2 deletions source/common/config/grpc_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ class GrpcSubscriptionImpl : public Subscription,

GrpcMuxSharedPtr grpcMux() { return grpc_mux_; }

void pause();
void resume();
ScopedResume pause();

private:
void disableInitFetchTimeoutTimer();
Expand Down
24 changes: 11 additions & 13 deletions source/common/config/new_grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,21 @@ NewGrpcMuxImpl::NewGrpcMuxImpl(Grpc::RawAsyncClientPtr&& async_client,
rate_limit_settings),
local_info_(local_info), transport_api_version_(transport_api_version) {}

void NewGrpcMuxImpl::pause(const std::string& type_url) { pausable_ack_queue_.pause(type_url); }

void NewGrpcMuxImpl::pause(const std::vector<std::string> type_urls) {
for (const auto& type_url : type_urls) {
pause(type_url);
}
}

void NewGrpcMuxImpl::resume(const std::string& type_url) {
pausable_ack_queue_.resume(type_url);
trySendDiscoveryRequests();
ScopedResume NewGrpcMuxImpl::pause(const std::string& type_url) {
return pause(std::vector<std::string>{type_url});
}

void NewGrpcMuxImpl::resume(const std::vector<std::string> type_urls) {
ScopedResume NewGrpcMuxImpl::pause(const std::vector<std::string> type_urls) {
for (const auto& type_url : type_urls) {
resume(type_url);
pausable_ack_queue_.pause(type_url);
}

return std::make_unique<Cleanup>([this, type_urls]() {
for (const auto& type_url : type_urls) {
pausable_ack_queue_.resume(type_url);
trySendDiscoveryRequests();
}
});
}

bool NewGrpcMuxImpl::paused(const std::string& type_url) const {
Expand Down
6 changes: 2 additions & 4 deletions source/common/config/new_grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ class NewGrpcMuxImpl
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder) override;

void pause(const std::string& type_url) override;
void pause(const std::vector<std::string> type_urls) override;
void resume(const std::string& type_url) override;
void resume(const std::vector<std::string> type_urls) override;
ScopedResume pause(const std::string& type_url) override;
ScopedResume pause(const std::vector<std::string> type_urls) override;
bool paused(const std::string& type_url) const override;
bool paused(const std::vector<std::string> type_urls) const override;

Expand Down
60 changes: 31 additions & 29 deletions source/common/router/scoped_rds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ using Envoy::Config::ConfigProvider;
using Envoy::Config::ConfigProviderInstanceType;
using Envoy::Config::ConfigProviderManager;
using Envoy::Config::ConfigProviderPtr;
using Envoy::Config::ScopedResume;

namespace Envoy {
namespace Router {
Expand Down Expand Up @@ -225,40 +226,41 @@ void ScopedRdsConfigSubscription::onConfigUpdate(
// NOTE: deletes are done before adds/updates.

absl::flat_hash_map<std::string, ScopedRouteInfoConstSharedPtr> to_be_removed_scopes;
// Destruction of resume_rds will lift the floodgate for new RDS subscriptions.
// Note in the case of partial acceptance, accepted RDS subscriptions should be started
// despite of any error.
ScopedResume resume_rds;
// If new route config sources come after the local init manager's initialize() been
// called, the init manager can't accept new targets. Instead we use a local override which will
// start new subscriptions but not wait on them to be ready.
std::unique_ptr<Init::ManagerImpl> noop_init_manager;
// NOTE: This should be defined after noop_init_manager as it depends on the
// noop_init_manager.
std::unique_ptr<Cleanup> resume_rds;
std::unique_ptr<Init::ManagerImpl> srds_init_mgr;
// NOTE: This should be defined after srds_init_mgr and resume_rds, as it depends on the
// srds_init_mgr, and we want a single RDS discovery request to be sent to management
// server.
std::unique_ptr<Cleanup> srds_initialization_continuation;
ASSERT(localInitManager().state() > Init::Manager::State::Uninitialized);
htuch marked this conversation as resolved.
Show resolved Hide resolved
const auto type_urls =
Envoy::Config::getAllVersionTypeUrls<envoy::config::route::v3::RouteConfiguration>();
// Pause RDS to not send a burst of RDS requests until we start all the new subscriptions.
// In the case that localInitManager is uninitialized, RDS is already paused
// either by Server init or LDS init.
if (factory_context_.clusterManager().adsMux()) {
resume_rds = factory_context_.clusterManager().adsMux()->pause(type_urls);
}
// if local init manager is initialized, the parent init manager may have gone away.
if (localInitManager().state() == Init::Manager::State::Initialized) {
const auto type_urls =
Envoy::Config::getAllVersionTypeUrls<envoy::config::route::v3::RouteConfiguration>();
noop_init_manager =
srds_init_mgr =
std::make_unique<Init::ManagerImpl>(fmt::format("SRDS {}:{}", name_, version_info));
// Pause RDS to not send a burst of RDS requests until we start all the new subscriptions.
// In the case if factory_context_.init_manager() is uninitialized, RDS is already paused
// either by Server init or LDS init.
if (factory_context_.clusterManager().adsMux()) {
factory_context_.clusterManager().adsMux()->pause(type_urls);
}
resume_rds = std::make_unique<Cleanup>([this, &noop_init_manager, version_info, type_urls] {
// For new RDS subscriptions created after listener warming up, we don't wait for them to
// warm up.
Init::WatcherImpl noop_watcher(
// Note: we just throw it away.
fmt::format("SRDS ConfigUpdate watcher {}:{}", name_, version_info),
[]() { /*Do nothing.*/ });
noop_init_manager->initialize(noop_watcher);
// New RDS subscriptions should have been created, now lift the floodgate.
// Note in the case of partial acceptance, accepted RDS subscriptions should be started
// despite of any error.
if (factory_context_.clusterManager().adsMux()) {
factory_context_.clusterManager().adsMux()->resume(type_urls);
}
});
srds_initialization_continuation =
std::make_unique<Cleanup>([this, &srds_init_mgr, version_info] {
// For new RDS subscriptions created after listener warming up, we don't wait for them to
// warm up.
Init::WatcherImpl noop_watcher(
// Note: we just throw it away.
fmt::format("SRDS ConfigUpdate watcher {}:{}", name_, version_info),
[]() { /*Do nothing.*/ });
srds_init_mgr->initialize(noop_watcher);
});
}

std::vector<std::string> exception_msgs;
Expand All @@ -268,7 +270,7 @@ void ScopedRdsConfigSubscription::onConfigUpdate(
to_be_removed_rds_providers = removeScopes(removed_resources, version_info);
bool any_applied =
addOrUpdateScopes(added_resources,
(noop_init_manager == nullptr ? localInitManager() : *noop_init_manager),
(srds_init_mgr == nullptr ? localInitManager() : *srds_init_mgr),
version_info, exception_msgs) ||
!to_be_removed_rds_providers.empty();
ConfigSubscriptionCommonBase::onConfigUpdate();
Expand Down
6 changes: 2 additions & 4 deletions source/common/upstream/cds_api_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,11 @@ void CdsApiImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& r
void CdsApiImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& added_resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
const std::string& system_version_info) {
std::unique_ptr<Cleanup> maybe_eds_resume;
Config::ScopedResume maybe_resume_eds;
if (cm_.adsMux()) {
const auto type_urls =
Config::getAllVersionTypeUrls<envoy::config::endpoint::v3::ClusterLoadAssignment>();
cm_.adsMux()->pause(type_urls);
maybe_eds_resume =
std::make_unique<Cleanup>([this, type_urls] { cm_.adsMux()->resume(type_urls); });
maybe_resume_eds = cm_.adsMux()->pause(type_urls);
}

ENVOY_LOG(info, "cds: add {} cluster(s), remove {} cluster(s)", added_resources.size(),
Expand Down
11 changes: 5 additions & 6 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,12 @@ void ClusterManagerInitHelper::maybeFinishInitialize() {
// If the first CDS response doesn't have any primary cluster, ClusterLoadAssignment
// should be already paused by CdsApiImpl::onConfigUpdate(). Need to check that to
// avoid double pause ClusterLoadAssignment.
std::unique_ptr<Cleanup> maybe_eds_resume;
Config::ScopedResume maybe_resume_eds;
if (cm_.adsMux()) {
const auto type_urls =
Config::getAllVersionTypeUrls<envoy::config::endpoint::v3::ClusterLoadAssignment>();
if (!cm_.adsMux()->paused(type_urls)) {
cm_.adsMux()->pause(type_urls);
maybe_eds_resume =
std::make_unique<Cleanup>([this, type_urls] { cm_.adsMux()->resume(type_urls); });
maybe_resume_eds = cm_.adsMux()->pause(type_urls);
}
}
initializeSecondaryClusters();
Expand Down Expand Up @@ -805,9 +803,10 @@ void ClusterManagerImpl::updateClusterCounts() {
const auto type_urls = Config::getAllVersionTypeUrls<envoy::config::cluster::v3::Cluster>();
const uint64_t previous_warming = cm_stats_.warming_clusters_.value();
if (previous_warming == 0 && !warming_clusters_.empty()) {
ads_mux_->pause(type_urls);
resume_cds_ = ads_mux_->pause(type_urls);
} else if (previous_warming > 0 && warming_clusters_.empty()) {
ads_mux_->resume(type_urls);
ASSERT(resume_cds_ != nullptr);
resume_cds_.reset();
}
}
cm_stats_.active_clusters_.set(active_clusters_.size());
Expand Down
Loading