diff --git a/source/common/config/config_provider_impl.cc b/source/common/config/config_provider_impl.cc index ed209e8427ad..d10a74b96f0b 100644 --- a/source/common/config/config_provider_impl.cc +++ b/source/common/config/config_provider_impl.cc @@ -76,7 +76,7 @@ bool ConfigSubscriptionInstance::checkAndApplyConfigUpdate(const Protobuf::Messa } void DeltaConfigSubscriptionInstance::applyDeltaConfigUpdate( - const std::function& update_fn) { + const std::function& update_fn, Event::PostCb complete_cb) { // The Config implementation is assumed to be shared across the config providers bound to this // subscription, therefore, simply propagating the update to all worker threads for a single bound // provider will be sufficient. @@ -92,10 +92,19 @@ void DeltaConfigSubscriptionInstance::applyDeltaConfigUpdate( // needed. Such logic could be generalized as part of this framework such that this function owns // the diffing and issues the corresponding call to add/modify/remove a resource according to a // vector of functions passed by the caller. - auto* typed_provider = - static_cast(getAnyBoundMutableConfigProvider()); - ConfigSharedPtr config = typed_provider->getConfig(); - typed_provider->onConfigUpdate([config, update_fn]() { update_fn(config); }); + // For now each config provider has its own copy of config, we need to propagate the update to + // every provider. + for (auto* provider : mutable_config_providers_) { + auto* typed_provider = static_cast(provider); + typed_provider->onConfigUpdate( + [update_fn, typed_provider]() { + // Note: this lambda is run on every worker thread, getting the config from within the + // lambda ensures us getting the per-worker config. + ConfigSharedPtr config = typed_provider->getConfig(); + update_fn(config); + }, + std::move(complete_cb)); + } } ConfigProviderManagerImplBase::ConfigProviderManagerImplBase(Server::Admin& admin, diff --git a/source/common/config/config_provider_impl.h b/source/common/config/config_provider_impl.h index 2553b3976e59..29c4df358ec2 100644 --- a/source/common/config/config_provider_impl.h +++ b/source/common/config/config_provider_impl.h @@ -291,8 +291,11 @@ class DeltaConfigSubscriptionInstance : public ConfigSubscriptionCommonBase { * subscription. * * @param update_fn the callback to run on each worker thread. + * @param complete_cb the callback to run on each worker thread. NOTE it's called each time a + * registered provider's update propagation finishes. */ - void applyDeltaConfigUpdate(const std::function& update_fn); + void applyDeltaConfigUpdate(const std::function& update_fn, + Event::PostCb complete_cb = Event::PostCb()); }; /** @@ -409,10 +412,16 @@ class DeltaMutableConfigProviderBase : public MutableConfigProviderCommonBase { /** * Propagates a delta config update to all workers. - * @param updateCb the callback to run on each worker. + * @param update_cb the callback to run on each worker. + * @param complete_cb the callback to run in main thread after the update propagation is done on + * every worker thread. */ - void onConfigUpdate(Envoy::Event::PostCb update_cb) { - tls_->runOnAllThreads(std::move(update_cb)); + void onConfigUpdate(Envoy::Event::PostCb update_cb, Event::PostCb complete_cb) { + if (complete_cb) { + tls_->runOnAllThreads(std::move(update_cb), std::move(complete_cb)); + } else { + tls_->runOnAllThreads(std::move(update_cb)); + } } protected: diff --git a/source/common/router/scoped_config_impl.h b/source/common/router/scoped_config_impl.h index b569ed3b4f06..8e5fff1582f8 100644 --- a/source/common/router/scoped_config_impl.h +++ b/source/common/router/scoped_config_impl.h @@ -167,6 +167,8 @@ class ScopedRouteInfo { } } } + ~ScopedRouteInfo() = default; + Router::ConfigConstSharedPtr routeConfig() const { return route_provider_->config(); } const ScopeKey& scopeKey() const { return scope_key_; } const envoy::api::v2::ScopedRouteConfiguration& configProto() const { return config_proto_; } diff --git a/source/common/router/scoped_rds.cc b/source/common/router/scoped_rds.cc index a7a299bb1959..a53e8591119c 100644 --- a/source/common/router/scoped_rds.cc +++ b/source/common/router/scoped_rds.cc @@ -143,13 +143,17 @@ void ScopedRdsConfigSubscription::onConfigUpdate( for (const auto& scope_name : removed_resources) { auto iter = scoped_route_map_.find(scope_name); if (iter != scoped_route_map_.end()) { + ScopedRouteInfoConstSharedPtr to_be_deleted = iter->second; scope_name_by_hash_.erase(iter->second->scopeKey().hash()); scoped_route_map_.erase(iter); - applyDeltaConfigUpdate([scope_name](const ConfigProvider::ConfigConstSharedPtr& config) { - auto* thread_local_scoped_config = const_cast( - static_cast(config.get())); - thread_local_scoped_config->removeRoutingScope(scope_name); - }); + applyDeltaConfigUpdate( + [scope_name](const ConfigProvider::ConfigConstSharedPtr& config) { + auto* thread_local_scoped_config = const_cast( + static_cast(config.get())); + thread_local_scoped_config->removeRoutingScope(scope_name); + }, + // We need to delete the associated RouteConfigProvider in main thread. + [to_be_deleted]() { /*to_be_deleted is destructed in main thread.*/ }); any_applied = true; ENVOY_LOG(debug, "srds: remove scoped route '{}'", scope_name); } @@ -171,7 +175,7 @@ void ScopedRdsConfigSubscription::onConfigUpdate( const Protobuf::RepeatedPtrField& resources, const std::string& version_info) { absl::flat_hash_map scoped_routes; - absl::flat_hash_map scope_key_by_key_hash; + absl::flat_hash_map scope_name_by_key_hash; for (const auto& resource_any : resources) { // Throws (thus rejects all) on any error. auto scoped_route = MessageUtil::anyConvert( @@ -186,10 +190,10 @@ void ScopedRdsConfigSubscription::onConfigUpdate( const envoy::api::v2::ScopedRouteConfiguration& scoped_route_config = scope_config_inserted.first->second; uint64_t key_fingerprint = MessageUtil::hash(scoped_route_config.key()); - if (!scope_key_by_key_hash.try_emplace(key_fingerprint, scope_name).second) { + if (!scope_name_by_key_hash.try_emplace(key_fingerprint, scope_name).second) { throw EnvoyException( fmt::format("scope key conflict found, first scope is '{}', second scope is '{}'", - scope_key_by_key_hash[key_fingerprint], scope_name)); + scope_name_by_key_hash[key_fingerprint], scope_name)); } } ScopedRouteMap scoped_routes_to_remove = scoped_route_map_; diff --git a/source/common/router/scoped_rds.h b/source/common/router/scoped_rds.h index 99e981d8ece4..45948d0ffbed 100644 --- a/source/common/router/scoped_rds.h +++ b/source/common/router/scoped_rds.h @@ -207,8 +207,6 @@ class ScopedRoutesConfigProviderManager : public Envoy::Config::ConfigProviderMa private: RouteConfigProviderManager& route_config_provider_manager_; - // From ConfigSource fingerprint to shared RouteConfigProvider. - absl::flat_hash_map> cached_route_providers_; }; // The optional argument passed to the ConfigProviderManager::create*() functions. diff --git a/test/common/config/config_provider_impl_test.cc b/test/common/config/config_provider_impl_test.cc index 2b3e320a7a62..8dcf22c39527 100644 --- a/test/common/config/config_provider_impl_test.cc +++ b/test/common/config/config_provider_impl_test.cc @@ -737,9 +737,9 @@ TEST_F(DeltaConfigProviderImplTest, MultipleDeltaSubscriptions) { // Issue a second config update to validate that having multiple providers bound to the // subscription causes a single update to the underlying shared config implementation. subscription.onConfigUpdate(untyped_dummy_configs, "2"); - // NOTE: the config implementation is append only and _does not_ track updates/removals to the - // config proto set, so the expectation is to double the size of the set. - EXPECT_EQ(provider1->config()->numProtos(), 4); + // NOTE: the two providers share the same config, each config update propagation would add 2 + // protos to the same config's proto vector, so the expectation is 6 here. + EXPECT_EQ(provider1->config()->numProtos(), 6); EXPECT_EQ(provider1->configProtoInfoVector().value().version_, "2"); } diff --git a/test/common/router/scoped_rds_test.cc b/test/common/router/scoped_rds_test.cc index 686f601f03a7..f250ce4d06ba 100644 --- a/test/common/router/scoped_rds_test.cc +++ b/test/common/router/scoped_rds_test.cc @@ -92,8 +92,6 @@ class ScopedRoutesTestBase : public testing::Test { Event::SimulatedTimeSystem& timeSystem() { return time_system_; } NiceMock factory_context_; - Upstream::ClusterManager::ClusterInfoMap cluster_map_; - Upstream::MockClusterMockPrioritySet cluster_; std::unique_ptr config_provider_manager_; MockRouteConfigProviderManager route_config_provider_manager_; absl::flat_hash_map> cached_route_configs_; @@ -253,6 +251,17 @@ route_configuration_name: foo_routes // Partially reject. 1UL, factory_context_.scope_.counter("foo.scoped_rds.foo_scoped_routes.config_reload").value()); + // foo_scope update is applied. + EXPECT_EQ(dynamic_cast(provider_.get()) + ->subscription() + .scopedRouteMap() + .size(), + 1UL); + EXPECT_EQ(dynamic_cast(provider_.get()) + ->subscription() + .scopedRouteMap() + .count("foo_scope"), + 1); } // Tests that only one resource is provided during a config update. diff --git a/test/integration/scoped_rds_integration_test.cc b/test/integration/scoped_rds_integration_test.cc index 13f63fb1e42a..2c294366ce3b 100644 --- a/test/integration/scoped_rds_integration_test.cc +++ b/test/integration/scoped_rds_integration_test.cc @@ -49,7 +49,7 @@ class ScopedRdsIntegrationTest : public HttpIntegrationTest, const std::string& scope_key_builder_config_yaml = R"EOF( fragments: - header_value_extractor: - name: X-Google-VIP + name: Addr element: key: x-foo-key separator: ; @@ -175,7 +175,7 @@ name: foo_scope2 route_configuration_name: foo_route1 key: fragments: - - string_key: x-foo-key + - string_key: x-bar-key )EOF"; const std::string route_config_tmpl = R"EOF(