Skip to content

Commit

Permalink
Review feedbacks fix, and some more changes to the config provider fr…
Browse files Browse the repository at this point in the history
…amework:

        * The current framework doesn't have a mechanism to enforce all registered DeltaMutableConfigProviderBase of a DeltaConfigSubscriptionInstance share the same config instance (will addresss it in another PR if desired), fix the bug in Del
taConfigSubscriptionInstance::applyDeltaConfigUpdate.
        * Add a complete_cb forDeltaConfigSubscriptionInstance::applyDeltaConfigUpdate() and DeltaMutableConfigProviderBase::onConfigUpdate(update_fn, complete_cb), as RouteConfigProvider instances has to be deleted in main thread, we need to de
struct the ScopedRouteInfo object in the complete_cb.
Signed-off-by: Xin Zhuang <stevenzzz@google.com>
  • Loading branch information
stevenzzzz committed Jul 12, 2019
1 parent 5cad3ee commit 50e8fb5
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 26 deletions.
19 changes: 14 additions & 5 deletions source/common/config/config_provider_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ bool ConfigSubscriptionInstance::checkAndApplyConfigUpdate(const Protobuf::Messa
}

void DeltaConfigSubscriptionInstance::applyDeltaConfigUpdate(
const std::function<void(const ConfigSharedPtr&)>& update_fn) {
const std::function<void(const ConfigSharedPtr&)>& update_fn, Event::PostCb complete_cb) {
// The Config implementation is assumed to be shared across the config providers bound to this
// subscription, therefore, simply propagating the update to all worker threads for a single bound
// provider will be sufficient.
Expand All @@ -92,10 +92,19 @@ void DeltaConfigSubscriptionInstance::applyDeltaConfigUpdate(
// needed. Such logic could be generalized as part of this framework such that this function owns
// the diffing and issues the corresponding call to add/modify/remove a resource according to a
// vector of functions passed by the caller.
auto* typed_provider =
static_cast<DeltaMutableConfigProviderBase*>(getAnyBoundMutableConfigProvider());
ConfigSharedPtr config = typed_provider->getConfig();
typed_provider->onConfigUpdate([config, update_fn]() { update_fn(config); });
// For now each config provider has its own copy of config, we need to propagate the update to
// every provider.
for (auto* provider : mutable_config_providers_) {
auto* typed_provider = static_cast<DeltaMutableConfigProviderBase*>(provider);
typed_provider->onConfigUpdate(
[update_fn, typed_provider]() {
// Note: this lambda is run on every worker thread, getting the config from within the
// lambda ensures us getting the per-worker config.
ConfigSharedPtr config = typed_provider->getConfig();
update_fn(config);
},
std::move(complete_cb));
}
}

ConfigProviderManagerImplBase::ConfigProviderManagerImplBase(Server::Admin& admin,
Expand Down
17 changes: 13 additions & 4 deletions source/common/config/config_provider_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,11 @@ class DeltaConfigSubscriptionInstance : public ConfigSubscriptionCommonBase {
* subscription.
*
* @param update_fn the callback to run on each worker thread.
* @param complete_cb the callback to run on each worker thread. NOTE it's called each time a
* registered provider's update propagation finishes.
*/
void applyDeltaConfigUpdate(const std::function<void(const ConfigSharedPtr&)>& update_fn);
void applyDeltaConfigUpdate(const std::function<void(const ConfigSharedPtr&)>& update_fn,
Event::PostCb complete_cb = Event::PostCb());
};

/**
Expand Down Expand Up @@ -409,10 +412,16 @@ class DeltaMutableConfigProviderBase : public MutableConfigProviderCommonBase {

/**
* Propagates a delta config update to all workers.
* @param updateCb the callback to run on each worker.
* @param update_cb the callback to run on each worker.
* @param complete_cb the callback to run in main thread after the update propagation is done on
* every worker thread.
*/
void onConfigUpdate(Envoy::Event::PostCb update_cb) {
tls_->runOnAllThreads(std::move(update_cb));
void onConfigUpdate(Envoy::Event::PostCb update_cb, Event::PostCb complete_cb) {
if (complete_cb) {
tls_->runOnAllThreads(std::move(update_cb), std::move(complete_cb));
} else {
tls_->runOnAllThreads(std::move(update_cb));
}
}

protected:
Expand Down
2 changes: 2 additions & 0 deletions source/common/router/scoped_config_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ class ScopedRouteInfo {
}
}
}
~ScopedRouteInfo() = default;

Router::ConfigConstSharedPtr routeConfig() const { return route_provider_->config(); }
const ScopeKey& scopeKey() const { return scope_key_; }
const envoy::api::v2::ScopedRouteConfiguration& configProto() const { return config_proto_; }
Expand Down
20 changes: 12 additions & 8 deletions source/common/router/scoped_rds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,17 @@ void ScopedRdsConfigSubscription::onConfigUpdate(
for (const auto& scope_name : removed_resources) {
auto iter = scoped_route_map_.find(scope_name);
if (iter != scoped_route_map_.end()) {
ScopedRouteInfoConstSharedPtr to_be_deleted = iter->second;
scope_name_by_hash_.erase(iter->second->scopeKey().hash());
scoped_route_map_.erase(iter);
applyDeltaConfigUpdate([scope_name](const ConfigProvider::ConfigConstSharedPtr& config) {
auto* thread_local_scoped_config = const_cast<ThreadLocalScopedConfigImpl*>(
static_cast<const ThreadLocalScopedConfigImpl*>(config.get()));
thread_local_scoped_config->removeRoutingScope(scope_name);
});
applyDeltaConfigUpdate(
[scope_name](const ConfigProvider::ConfigConstSharedPtr& config) {
auto* thread_local_scoped_config = const_cast<ThreadLocalScopedConfigImpl*>(
static_cast<const ThreadLocalScopedConfigImpl*>(config.get()));
thread_local_scoped_config->removeRoutingScope(scope_name);
},
// We need to delete the associated RouteConfigProvider in main thread.
[to_be_deleted]() { /*to_be_deleted is destructed in main thread.*/ });
any_applied = true;
ENVOY_LOG(debug, "srds: remove scoped route '{}'", scope_name);
}
Expand All @@ -171,7 +175,7 @@ void ScopedRdsConfigSubscription::onConfigUpdate(
const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
const std::string& version_info) {
absl::flat_hash_map<std::string, envoy::api::v2::ScopedRouteConfiguration> scoped_routes;
absl::flat_hash_map<uint64_t, std::string> scope_key_by_key_hash;
absl::flat_hash_map<uint64_t, std::string> scope_name_by_key_hash;
for (const auto& resource_any : resources) {
// Throws (thus rejects all) on any error.
auto scoped_route = MessageUtil::anyConvert<envoy::api::v2::ScopedRouteConfiguration>(
Expand All @@ -186,10 +190,10 @@ void ScopedRdsConfigSubscription::onConfigUpdate(
const envoy::api::v2::ScopedRouteConfiguration& scoped_route_config =
scope_config_inserted.first->second;
uint64_t key_fingerprint = MessageUtil::hash(scoped_route_config.key());
if (!scope_key_by_key_hash.try_emplace(key_fingerprint, scope_name).second) {
if (!scope_name_by_key_hash.try_emplace(key_fingerprint, scope_name).second) {
throw EnvoyException(
fmt::format("scope key conflict found, first scope is '{}', second scope is '{}'",
scope_key_by_key_hash[key_fingerprint], scope_name));
scope_name_by_key_hash[key_fingerprint], scope_name));
}
}
ScopedRouteMap scoped_routes_to_remove = scoped_route_map_;
Expand Down
2 changes: 0 additions & 2 deletions source/common/router/scoped_rds.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,6 @@ class ScopedRoutesConfigProviderManager : public Envoy::Config::ConfigProviderMa

private:
RouteConfigProviderManager& route_config_provider_manager_;
// From ConfigSource fingerprint to shared RouteConfigProvider.
absl::flat_hash_map<std::string, std::shared_ptr<RouteConfigProvider>> cached_route_providers_;
};

// The optional argument passed to the ConfigProviderManager::create*() functions.
Expand Down
6 changes: 3 additions & 3 deletions test/common/config/config_provider_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -737,9 +737,9 @@ TEST_F(DeltaConfigProviderImplTest, MultipleDeltaSubscriptions) {
// Issue a second config update to validate that having multiple providers bound to the
// subscription causes a single update to the underlying shared config implementation.
subscription.onConfigUpdate(untyped_dummy_configs, "2");
// NOTE: the config implementation is append only and _does not_ track updates/removals to the
// config proto set, so the expectation is to double the size of the set.
EXPECT_EQ(provider1->config<const ThreadLocalDummyConfig>()->numProtos(), 4);
// NOTE: the two providers share the same config, each config update propagation would add 2
// protos to the same config's proto vector, so the expectation is 6 here.
EXPECT_EQ(provider1->config<const ThreadLocalDummyConfig>()->numProtos(), 6);
EXPECT_EQ(provider1->configProtoInfoVector<test::common::config::DummyConfig>().value().version_,
"2");
}
Expand Down
13 changes: 11 additions & 2 deletions test/common/router/scoped_rds_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ class ScopedRoutesTestBase : public testing::Test {
Event::SimulatedTimeSystem& timeSystem() { return time_system_; }

NiceMock<Server::Configuration::MockFactoryContext> factory_context_;
Upstream::ClusterManager::ClusterInfoMap cluster_map_;
Upstream::MockClusterMockPrioritySet cluster_;
std::unique_ptr<ScopedRoutesConfigProviderManager> config_provider_manager_;
MockRouteConfigProviderManager route_config_provider_manager_;
absl::flat_hash_map<std::string, std::shared_ptr<MockConfig>> cached_route_configs_;
Expand Down Expand Up @@ -253,6 +251,17 @@ route_configuration_name: foo_routes
// Partially reject.
1UL,
factory_context_.scope_.counter("foo.scoped_rds.foo_scoped_routes.config_reload").value());
// foo_scope update is applied.
EXPECT_EQ(dynamic_cast<ScopedRdsConfigProvider*>(provider_.get())
->subscription()
.scopedRouteMap()
.size(),
1UL);
EXPECT_EQ(dynamic_cast<ScopedRdsConfigProvider*>(provider_.get())
->subscription()
.scopedRouteMap()
.count("foo_scope"),
1);
}

// Tests that only one resource is provided during a config update.
Expand Down
4 changes: 2 additions & 2 deletions test/integration/scoped_rds_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ScopedRdsIntegrationTest : public HttpIntegrationTest,
const std::string& scope_key_builder_config_yaml = R"EOF(
fragments:
- header_value_extractor:
name: X-Google-VIP
name: Addr
element:
key: x-foo-key
separator: ;
Expand Down Expand Up @@ -175,7 +175,7 @@ name: foo_scope2
route_configuration_name: foo_route1
key:
fragments:
- string_key: x-foo-key
- string_key: x-bar-key
)EOF";

const std::string route_config_tmpl = R"EOF(
Expand Down

0 comments on commit 50e8fb5

Please sign in to comment.