Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing Endpoint lease for ClusterLoadAssigment #6477

Merged
merged 15 commits into from
Apr 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/envoy/api/v2/eds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import "google/api/annotations.proto";
import "validate/validate.proto";
import "gogoproto/gogo.proto";
import "google/protobuf/wrappers.proto";
import "google/protobuf/duration.proto";

option (gogoproto.equal_all) = true;
option (gogoproto.stable_marshaler_all) = true;
Expand Down Expand Up @@ -107,6 +108,12 @@ message ClusterLoadAssignment {
// Read more at :ref:`priority levels <arch_overview_load_balancing_priority_levels>` and
// :ref:`localities <arch_overview_load_balancing_locality_weighted_lb>`.
google.protobuf.UInt32Value overprovisioning_factor = 3 [(validate.rules).uint32.gt = 0];

// The max time until which the endpoints from this assignment can be used.
// If no new assignments are received before this time expires the endpoints
// are considered stale and should be marked unhealthy.
// Defaults to 0 which means endpoints never go stale.
google.protobuf.Duration endpoint_stale_after = 4 [(validate.rules).duration.gt.seconds = 0];
}

// Load balancing policy settings.
Expand Down
2 changes: 2 additions & 0 deletions docs/root/configuration/cluster_manager/cluster_stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ Every cluster has a statistics tree rooted at *cluster.<name>.* with the followi
version, Gauge, Hash of the contents from the last successful API fetch
max_host_weight, Gauge, Maximum weight of any host in the cluster
bind_errors, Counter, Total errors binding the socket to the configured source address
assignment_timeout_received, Counter, Total assignments received with endpoint lease information.
assignment_stale, Counter, Number of times the received assignments went stale before new assignments arrived.

Health check statistics
-----------------------
Expand Down
1 change: 1 addition & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Version history
================
* dubbo_proxy: support the :ref:`Dubbo proxy filter <config_network_filters_dubbo_proxy>`.
* event: added :ref:`loop duration and poll delay statistics <operations_performance>`.
* eds: added support to specify max time for which endpoints can be used :ref:`gRPC filter <envoy_api_msg_ClusterLoadAssignment.Policy>`.
* http: mitigated a race condition with the :ref:`delayed_close_timeout<envoy_api_field_config.filter.network.http_connection_manager.v2.HttpConnectionManager.delayed_close_timeout>` where it could trigger while actively flushing a pending write buffer for a downstream connection.
* redis: added :ref:`prefix routing <envoy_api_field_config.filter.network.redis_proxy.v2.RedisProxy.prefix_routes>` to enable routing commands based on their key's prefix to different upstream.
* redis: add support for zpopmax and zpopmin commands.
Expand Down
2 changes: 2 additions & 0 deletions include/envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,8 @@ class PrioritySet {
COUNTER (update_failure) \
COUNTER (update_empty) \
COUNTER (update_no_rebuild) \
COUNTER (assignment_timeout_received) \
COUNTER (assignment_stale) \
vishalpowar marked this conversation as resolved.
Show resolved Hide resolved
GAUGE (version)
// clang-format on

Expand Down
32 changes: 30 additions & 2 deletions source/common/upstream/eds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "envoy/api/v2/eds.pb.validate.h"

#include "common/common/utility.h"
#include "common/config/subscription_factory.h"

namespace Envoy {
Expand All @@ -18,11 +19,11 @@ EdsClusterImpl::EdsClusterImpl(
? cluster.name()
: cluster.eds_cluster_config().service_name()) {
Config::Utility::checkLocalInfo("eds", local_info_);

const auto& eds_config = cluster.eds_cluster_config().eds_config();
Event::Dispatcher& dispatcher = factory_context.dispatcher();
Runtime::RandomGenerator& random = factory_context.random();
Upstream::ClusterManager& cm = factory_context.clusterManager();
assignment_timeout_ = dispatcher.createTimer([this]() -> void { onAssignmentTimeout(); });
const auto& eds_config = cluster.eds_cluster_config().eds_config();
subscription_ = Config::SubscriptionFactory::subscriptionFromConfigSource(
eds_config, local_info_, dispatcher, cm, random, info_->statsScope(),
"envoy.api.v2.EndpointDiscoveryService.FetchEndpoints",
Expand Down Expand Up @@ -118,10 +119,37 @@ void EdsClusterImpl::onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt
cluster_load_assignment.cluster_name()));
}

// Disable timer (if enabled) as we have received new assignment.
if (assignment_timeout_->enabled()) {
assignment_timeout_->disableTimer();
}
// Check if endpoint_stale_after is set.
const uint64_t stale_after_ms =
PROTOBUF_GET_MS_OR_DEFAULT(cluster_load_assignment.policy(), endpoint_stale_after, 0);
if (stale_after_ms > 0) {
// Stat to track how often we receive valid assignment_timeout in response.
info_->stats().assignment_timeout_received_.inc();
assignment_timeout_->enableTimer(std::chrono::milliseconds(stale_after_ms));
}

BatchUpdateHelper helper(*this, cluster_load_assignment);
priority_set_.batchHostUpdate(helper);
}

void EdsClusterImpl::onAssignmentTimeout() {
// We can no longer use the assignments, remove them.
// TODO(vishalpowar) This is not going to work for incremental updates, and we
// need to instead change the health status to indicate the assignments are
// stale.
Protobuf::RepeatedPtrField<ProtobufWkt::Any> resources;
envoy::api::v2::ClusterLoadAssignment resource;
resource.set_cluster_name(cluster_name_);
resources.Add()->PackFrom(resource);
onConfigUpdate(resources, "");
// Stat to track how often we end up with stale assignments.
info_->stats().assignment_stale_.inc();
}

bool EdsClusterImpl::updateHostsPerLocality(
const uint32_t priority, const uint32_t overprovisioning_factor, const HostVector& new_hosts,
LocalityWeightsMap& locality_weights_map, LocalityWeightsMap& new_locality_weights_map,
Expand Down
2 changes: 2 additions & 0 deletions source/common/upstream/eds.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class EdsClusterImpl : public BaseDynamicClusterImpl, Config::SubscriptionCallba

// ClusterImplBase
void startPreInit() override;
void onAssignmentTimeout();

class BatchUpdateHelper : public PrioritySet::BatchUpdateCb {
public:
Expand All @@ -74,6 +75,7 @@ class EdsClusterImpl : public BaseDynamicClusterImpl, Config::SubscriptionCallba
const std::string cluster_name_;
std::vector<LocalityWeightsMap> locality_weights_map_;
HostMap all_hosts_;
Event::TimerPtr assignment_timeout_;
};

class EdsClusterFactory : public ClusterFactoryImplBase {
Expand Down
102 changes: 102 additions & 0 deletions test/common/upstream/eds_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "gtest/gtest.h"

using testing::_;
using testing::AtLeast;
using testing::Return;
using testing::ReturnRef;

Expand Down Expand Up @@ -1430,6 +1431,107 @@ TEST_F(EdsTest, MalformedIP) {
"setting cluster type to 'STRICT_DNS' or 'LOGICAL_DNS'");
}

class EdsAssignmentTimeoutTest : public EdsTest {
public:
EdsAssignmentTimeoutTest() : EdsTest(), interval_timer_(nullptr) {
EXPECT_CALL(dispatcher_, createTimer_(_))
.WillOnce(Invoke([this](Event::TimerCb cb) {
timer_cb_ = cb;
EXPECT_EQ(nullptr, interval_timer_);
interval_timer_ = new Event::MockTimer();
return interval_timer_;
}))
.WillRepeatedly(Invoke([](Event::TimerCb) { return new Event::MockTimer(); }));
vishalpowar marked this conversation as resolved.
Show resolved Hide resolved

resetCluster();
}

Event::MockTimer* interval_timer_;
Event::TimerCb timer_cb_;
};

// Test that assignment timeout is enabled and disabled correctly.
TEST_F(EdsAssignmentTimeoutTest, AssignmentTimeoutEnableDisable) {
envoy::api::v2::ClusterLoadAssignment cluster_load_assignment;
cluster_load_assignment.set_cluster_name("fare");
auto* endpoints = cluster_load_assignment.add_endpoints();

auto health_checker = std::make_shared<MockHealthChecker>();
EXPECT_CALL(*health_checker, start());
EXPECT_CALL(*health_checker, addHostCheckCompleteCb(_)).Times(2);
cluster_->setHealthChecker(health_checker);

auto* socket_address = endpoints->add_lb_endpoints()
->mutable_endpoint()
->mutable_address()
->mutable_socket_address();
socket_address->set_address("1.2.3.4");
socket_address->set_port_value(80);

envoy::api::v2::ClusterLoadAssignment cluster_load_assignment_lease = cluster_load_assignment;
cluster_load_assignment_lease.mutable_policy()->mutable_endpoint_stale_after()->MergeFrom(
Protobuf::util::TimeUtil::SecondsToDuration(1));

EXPECT_CALL(*interval_timer_, enableTimer(_)).Times(2); // Timer enabled twice.
EXPECT_CALL(*interval_timer_, disableTimer()).Times(1); // Timer disabled once.
EXPECT_CALL(*interval_timer_, enabled()).Times(6); // Includes calls by test.
doOnConfigUpdateVerifyNoThrow(cluster_load_assignment_lease);
// Check that the timer is enabled.
EXPECT_EQ(interval_timer_->enabled(), true);
doOnConfigUpdateVerifyNoThrow(cluster_load_assignment);
// Check that the timer is disabled.
EXPECT_EQ(interval_timer_->enabled(), false);
doOnConfigUpdateVerifyNoThrow(cluster_load_assignment_lease);
// Check that the timer is enabled.
EXPECT_EQ(interval_timer_->enabled(), true);
}

// Test that assignment timeout is called and removes all the endpoints.
vishalpowar marked this conversation as resolved.
Show resolved Hide resolved
TEST_F(EdsAssignmentTimeoutTest, AssignmentLeaseExpired) {
envoy::api::v2::ClusterLoadAssignment cluster_load_assignment;
cluster_load_assignment.set_cluster_name("fare");
cluster_load_assignment.mutable_policy()->mutable_endpoint_stale_after()->MergeFrom(
Protobuf::util::TimeUtil::SecondsToDuration(1));

auto health_checker = std::make_shared<MockHealthChecker>();
EXPECT_CALL(*health_checker, start());
EXPECT_CALL(*health_checker, addHostCheckCompleteCb(_)).Times(2);
cluster_->setHealthChecker(health_checker);

auto add_endpoint = [&cluster_load_assignment](int port) {
auto* endpoints = cluster_load_assignment.add_endpoints();

auto* socket_address = endpoints->add_lb_endpoints()
->mutable_endpoint()
->mutable_address()
->mutable_socket_address();
socket_address->set_address("1.2.3.4");
socket_address->set_port_value(port);
};

// Add two endpoints to the cluster assignment.
add_endpoint(80);
add_endpoint(81);

// Expect the timer to be enabled once.
EXPECT_CALL(*interval_timer_, enableTimer(std::chrono::milliseconds(1000)));
// Expect the timer to be disabled when stale assignments are removed.
EXPECT_CALL(*interval_timer_, disableTimer());
EXPECT_CALL(*interval_timer_, enabled()).Times(2);
doOnConfigUpdateVerifyNoThrow(cluster_load_assignment);
{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 2);
}
// Call the timer callback to indicate timeout.
timer_cb_();
// Test that stale endpoints are removed.
{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 0);
}
}

} // namespace
} // namespace Upstream
} // namespace Envoy
4 changes: 2 additions & 2 deletions test/integration/stats_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ TEST_P(ClusterMemoryTestRunner, MemoryLargeClusterSizeWithStats) {

EXPECT_LT(start_mem, m1);
EXPECT_LT(start_mem, m1001);
// As of 2019/03/20, m_per_cluster = 59015 (libstdc++)
EXPECT_LT(m_per_cluster, 59100);
// As of 2019/04/12, m_per_cluster = 59576 (libstdc++)
EXPECT_LT(m_per_cluster, 59600);
}

} // namespace
Expand Down