Skip to content

Commit

Permalink
test infra: Remove timeSource() from the ClusterManager api (#4247)
Browse files Browse the repository at this point in the history
Follow-up to #4180 -- removes ClusterManager::timeSource() as an API. Another step toward resolving #4160

Risk Level: low
Testing: //test/...
Docs Changes: n/a
Release Notes: n/a

Signed-off-by: Joshua Marantz <jmarantz@google.com>
  • Loading branch information
jmarantz authored and htuch committed Aug 28, 2018
1 parent cd171d9 commit c987b42
Show file tree
Hide file tree
Showing 21 changed files with 89 additions and 61 deletions.
7 changes: 4 additions & 3 deletions source/common/grpc/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ namespace Envoy {
namespace Grpc {

AsyncClientImpl::AsyncClientImpl(Upstream::ClusterManager& cm,
const envoy::api::v2::core::GrpcService& config)
const envoy::api::v2::core::GrpcService& config,
TimeSource& time_source)
: cm_(cm), remote_cluster_name_(config.envoy_grpc().cluster_name()),
initial_metadata_(config.initial_metadata()) {}
initial_metadata_(config.initial_metadata()), time_source_(time_source) {}

AsyncClientImpl::~AsyncClientImpl() {
while (!active_streams_.empty()) {
Expand Down Expand Up @@ -207,7 +208,7 @@ AsyncRequestImpl::AsyncRequestImpl(AsyncClientImpl& parent,

current_span_ = parent_span.spawnChild(Tracing::EgressConfig::get(),
"async " + parent.remote_cluster_name_ + " egress",
parent.timeSource().systemTime());
parent.time_source_.systemTime());
current_span_->setTag(Tracing::Tags::get().UPSTREAM_CLUSTER, parent.remote_cluster_name_);
current_span_->setTag(Tracing::Tags::get().COMPONENT, Tracing::Tags::get().PROXY);
}
Expand Down
6 changes: 3 additions & 3 deletions source/common/grpc/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ class AsyncStreamImpl;

class AsyncClientImpl final : public AsyncClient {
public:
AsyncClientImpl(Upstream::ClusterManager& cm, const envoy::api::v2::core::GrpcService& config);
AsyncClientImpl(Upstream::ClusterManager& cm, const envoy::api::v2::core::GrpcService& config,
TimeSource& time_source);
~AsyncClientImpl() override;

// Grpc::AsyncClient
Expand All @@ -25,13 +26,12 @@ class AsyncClientImpl final : public AsyncClient {
AsyncStream* start(const Protobuf::MethodDescriptor& service_method,
AsyncStreamCallbacks& callbacks) override;

TimeSource& timeSource() { return cm_.timeSource(); }

private:
Upstream::ClusterManager& cm_;
const std::string remote_cluster_name_;
const Protobuf::RepeatedPtrField<envoy::api::v2::core::HeaderValue> initial_metadata_;
std::list<std::unique_ptr<AsyncStreamImpl>> active_streams_;
TimeSource& time_source_;

friend class AsyncRequestImpl;
friend class AsyncStreamImpl;
Expand Down
13 changes: 7 additions & 6 deletions source/common/grpc/async_client_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ namespace Grpc {

AsyncClientFactoryImpl::AsyncClientFactoryImpl(Upstream::ClusterManager& cm,
const envoy::api::v2::core::GrpcService& config,
bool skip_cluster_check)
: cm_(cm), config_(config) {
bool skip_cluster_check, TimeSource& time_source)
: cm_(cm), config_(config), time_source_(time_source) {
if (skip_cluster_check) {
return;
}
Expand All @@ -31,8 +31,8 @@ AsyncClientFactoryImpl::AsyncClientFactoryImpl(Upstream::ClusterManager& cm,
}

AsyncClientManagerImpl::AsyncClientManagerImpl(Upstream::ClusterManager& cm,
ThreadLocal::Instance& tls)
: cm_(cm), tls_(tls) {
ThreadLocal::Instance& tls, TimeSource& time_source)
: cm_(cm), tls_(tls), time_source_(time_source) {
#ifdef ENVOY_GOOGLE_GRPC
google_tls_slot_ = tls.allocateSlot();
google_tls_slot_->set(
Expand All @@ -41,7 +41,7 @@ AsyncClientManagerImpl::AsyncClientManagerImpl(Upstream::ClusterManager& cm,
}

AsyncClientPtr AsyncClientFactoryImpl::create() {
return std::make_unique<AsyncClientImpl>(cm_, config_);
return std::make_unique<AsyncClientImpl>(cm_, config_, time_source_);
}

GoogleAsyncClientFactoryImpl::GoogleAsyncClientFactoryImpl(
Expand All @@ -50,6 +50,7 @@ GoogleAsyncClientFactoryImpl::GoogleAsyncClientFactoryImpl(
: tls_(tls), google_tls_slot_(google_tls_slot),
scope_(scope.createScope(fmt::format("grpc.{}.", config.google_grpc().stat_prefix()))),
config_(config) {

#ifndef ENVOY_GOOGLE_GRPC
UNREFERENCED_PARAMETER(tls_);
UNREFERENCED_PARAMETER(google_tls_slot_);
Expand Down Expand Up @@ -77,7 +78,7 @@ AsyncClientManagerImpl::factoryForGrpcService(const envoy::api::v2::core::GrpcSe
Stats::Scope& scope, bool skip_cluster_check) {
switch (config.target_specifier_case()) {
case envoy::api::v2::core::GrpcService::kEnvoyGrpc:
return std::make_unique<AsyncClientFactoryImpl>(cm_, config, skip_cluster_check);
return std::make_unique<AsyncClientFactoryImpl>(cm_, config, skip_cluster_check, time_source_);
case envoy::api::v2::core::GrpcService::kGoogleGrpc:
return std::make_unique<GoogleAsyncClientFactoryImpl>(tls_, google_tls_slot_.get(), scope,
config);
Expand Down
8 changes: 6 additions & 2 deletions source/common/grpc/async_client_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ namespace Grpc {
class AsyncClientFactoryImpl : public AsyncClientFactory {
public:
AsyncClientFactoryImpl(Upstream::ClusterManager& cm,
const envoy::api::v2::core::GrpcService& config, bool skip_cluster_check);
const envoy::api::v2::core::GrpcService& config, bool skip_cluster_check,
TimeSource& time_source);

AsyncClientPtr create() override;

private:
Upstream::ClusterManager& cm_;
const envoy::api::v2::core::GrpcService config_;
TimeSource& time_source_;
};

class GoogleAsyncClientFactoryImpl : public AsyncClientFactory {
Expand All @@ -38,7 +40,8 @@ class GoogleAsyncClientFactoryImpl : public AsyncClientFactory {

class AsyncClientManagerImpl : public AsyncClientManager {
public:
AsyncClientManagerImpl(Upstream::ClusterManager& cm, ThreadLocal::Instance& tls);
AsyncClientManagerImpl(Upstream::ClusterManager& cm, ThreadLocal::Instance& tls,
TimeSource& time_source);

// Grpc::AsyncClientManager
AsyncClientFactoryPtr factoryForGrpcService(const envoy::api::v2::core::GrpcService& config,
Expand All @@ -49,6 +52,7 @@ class AsyncClientManagerImpl : public AsyncClientManager {
Upstream::ClusterManager& cm_;
ThreadLocal::Instance& tls_;
ThreadLocal::SlotPtr google_tls_slot_;
TimeSource& time_source_;
};

} // namespace Grpc
Expand Down
5 changes: 3 additions & 2 deletions source/common/http/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ AsyncClientImpl::AsyncClientImpl(const Upstream::ClusterInfo& cluster, Stats::St
Upstream::ClusterManager& cm, Runtime::Loader& runtime,
Runtime::RandomGenerator& random,
Router::ShadowWriterPtr&& shadow_writer)
: cluster_(cluster), config_("http.async-client.", local_info, stats_store, cm, runtime, random,
std::move(shadow_writer), true, false, false),
: cluster_(cluster),
config_("http.async-client.", local_info, stats_store, cm, runtime, random,
std::move(shadow_writer), true, false, false, dispatcher.timeSource()),
dispatcher_(dispatcher) {}

AsyncClientImpl::~AsyncClientImpl() {
Expand Down
12 changes: 8 additions & 4 deletions source/common/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,26 +96,29 @@ class FilterConfig {
FilterConfig(const std::string& stat_prefix, const LocalInfo::LocalInfo& local_info,
Stats::Scope& scope, Upstream::ClusterManager& cm, Runtime::Loader& runtime,
Runtime::RandomGenerator& random, ShadowWriterPtr&& shadow_writer,
bool emit_dynamic_stats, bool start_child_span, bool suppress_envoy_headers)
bool emit_dynamic_stats, bool start_child_span, bool suppress_envoy_headers,
TimeSource& time_source)
: scope_(scope), local_info_(local_info), cm_(cm), runtime_(runtime),
random_(random), stats_{ALL_ROUTER_STATS(POOL_COUNTER_PREFIX(scope, stat_prefix))},
emit_dynamic_stats_(emit_dynamic_stats), start_child_span_(start_child_span),
suppress_envoy_headers_(suppress_envoy_headers), shadow_writer_(std::move(shadow_writer)) {}
suppress_envoy_headers_(suppress_envoy_headers), shadow_writer_(std::move(shadow_writer)),
time_source_(time_source) {}

FilterConfig(const std::string& stat_prefix, Server::Configuration::FactoryContext& context,
ShadowWriterPtr&& shadow_writer,
const envoy::config::filter::http::router::v2::Router& config)
: FilterConfig(stat_prefix, context.localInfo(), context.scope(), context.clusterManager(),
context.runtime(), context.random(), std::move(shadow_writer),
PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, dynamic_stats, true),
config.start_child_span(), config.suppress_envoy_headers()) {
config.start_child_span(), config.suppress_envoy_headers(),
context.timeSource()) {
for (const auto& upstream_log : config.upstream_log()) {
upstream_logs_.push_back(AccessLog::AccessLogFactory::fromProto(upstream_log, context));
}
}

ShadowWriter& shadowWriter() { return *shadow_writer_; }
TimeSource& timeSource() { return cm_.timeSource(); }
TimeSource& timeSource() { return time_source_; }

Stats::Scope& scope_;
const LocalInfo::LocalInfo& local_info_;
Expand All @@ -130,6 +133,7 @@ class FilterConfig {

private:
ShadowWriterPtr shadow_writer_;
TimeSource& time_source_;
};

typedef std::shared_ptr<FilterConfig> FilterConfigSharedPtr;
Expand Down
4 changes: 2 additions & 2 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ ClusterManagerImpl::ClusterManagerImpl(const envoy::config::bootstrap::v2::Boots
config_tracker_entry_(
admin.getConfigTracker().add("clusters", [this] { return dumpClusterConfigs(); })),
time_source_(main_thread_dispatcher.timeSource()), dispatcher_(main_thread_dispatcher) {
async_client_manager_ = std::make_unique<Grpc::AsyncClientManagerImpl>(*this, tls);
async_client_manager_ = std::make_unique<Grpc::AsyncClientManagerImpl>(*this, tls, time_source_);
const auto& cm_config = bootstrap.cluster_manager();
if (cm_config.has_outlier_detection()) {
const std::string event_log_file_path = cm_config.outlier_detection().event_log_path();
Expand Down Expand Up @@ -305,7 +305,7 @@ ClusterManagerImpl::ClusterManagerImpl(const envoy::config::bootstrap::v2::Boots
Config::Utility::factoryForGrpcApiConfigSource(
*async_client_manager_, load_stats_config, stats)
->create(),
main_thread_dispatcher, timeSource().monotonic()));
main_thread_dispatcher));
}
}

Expand Down
9 changes: 4 additions & 5 deletions source/common/upstream/load_stats_reporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@ namespace Upstream {
LoadStatsReporter::LoadStatsReporter(const LocalInfo::LocalInfo& local_info,
ClusterManager& cluster_manager, Stats::Scope& scope,
Grpc::AsyncClientPtr async_client,
Event::Dispatcher& dispatcher,
MonotonicTimeSource& time_source)
Event::Dispatcher& dispatcher)
: cm_(cluster_manager), stats_{ALL_LOAD_REPORTER_STATS(
POOL_COUNTER_PREFIX(scope, "load_reporter."))},
async_client_(std::move(async_client)),
service_method_(*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.load_stats.v2.LoadReportingService.StreamLoadStats")),
time_source_(time_source) {
time_source_(dispatcher.timeSource()) {
request_.mutable_node()->MergeFrom(local_info.node());
retry_timer_ = dispatcher.createTimer([this]() -> void { establishNewStream(); });
response_timer_ = dispatcher.createTimer([this]() -> void { sendLoadStatsRequest(); });
Expand Down Expand Up @@ -78,7 +77,7 @@ void LoadStatsReporter::sendLoadStatsRequest() {
}
cluster_stats->set_total_dropped_requests(
cluster.info()->loadReportStats().upstream_rq_dropped_.latch());
const auto now = time_source_.currentTime().time_since_epoch();
const auto now = time_source_.monotonicTime().time_since_epoch();
const auto measured_interval = now - cluster_name_and_timestamp.second;
cluster_stats->mutable_load_report_interval()->MergeFrom(
Protobuf::util::TimeUtil::MicrosecondsToDuration(
Expand Down Expand Up @@ -137,7 +136,7 @@ void LoadStatsReporter::startLoadReportPeriod() {
for (const std::string& cluster_name : message_->clusters()) {
clusters_.emplace(cluster_name, existing_clusters.count(cluster_name) > 0
? existing_clusters[cluster_name]
: time_source_.currentTime().time_since_epoch());
: time_source_.monotonicTime().time_since_epoch());
auto cluster_info_map = cm_.clusters();
auto it = cluster_info_map.find(cluster_name);
if (it == cluster_info_map.end()) {
Expand Down
4 changes: 2 additions & 2 deletions source/common/upstream/load_stats_reporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class LoadStatsReporter
public:
LoadStatsReporter(const LocalInfo::LocalInfo& local_info, ClusterManager& cluster_manager,
Stats::Scope& scope, Grpc::AsyncClientPtr async_client,
Event::Dispatcher& dispatcher, MonotonicTimeSource& time_source);
Event::Dispatcher& dispatcher);

// Grpc::TypedAsyncStreamCallbacks
void onCreateInitialMetadata(Http::HeaderMap& metadata) override;
Expand Down Expand Up @@ -66,7 +66,7 @@ class LoadStatsReporter
std::unique_ptr<envoy::service::load_stats::v2::LoadStatsResponse> message_;
// Map from cluster name to start of measurement interval.
std::unordered_map<std::string, std::chrono::steady_clock::duration> clusters_;
MonotonicTimeSource& time_source_;
TimeSource& time_source_;
};

typedef std::unique_ptr<LoadStatsReporter> LoadStatsReporterPtr;
Expand Down
6 changes: 3 additions & 3 deletions source/extensions/tracers/zipkin/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ Tracing::HttpTracerPtr ZipkinTracerFactory::createHttpTracer(const Json::Object&

Envoy::Runtime::RandomGenerator& rand = server.random();

Tracing::DriverPtr zipkin_driver(new Zipkin::Driver(json_config, server.clusterManager(),
server.stats(), server.threadLocal(),
server.runtime(), server.localInfo(), rand));
Tracing::DriverPtr zipkin_driver(
new Zipkin::Driver(json_config, server.clusterManager(), server.stats(), server.threadLocal(),
server.runtime(), server.localInfo(), rand, server.timeSource()));

return Tracing::HttpTracerPtr(
new Tracing::HttpTracerImpl(std::move(zipkin_driver), server.localInfo()));
Expand Down
8 changes: 5 additions & 3 deletions source/extensions/tracers/zipkin/zipkin_tracer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@ Driver::TlsTracer::TlsTracer(TracerPtr&& tracer, Driver& driver)

Driver::Driver(const Json::Object& config, Upstream::ClusterManager& cluster_manager,
Stats::Store& stats, ThreadLocal::SlotAllocator& tls, Runtime::Loader& runtime,
const LocalInfo::LocalInfo& local_info, Runtime::RandomGenerator& random_generator)
const LocalInfo::LocalInfo& local_info, Runtime::RandomGenerator& random_generator,
TimeSource& time_source)
: cm_(cluster_manager), tracer_stats_{ZIPKIN_TRACER_STATS(
POOL_COUNTER_PREFIX(stats, "tracing.zipkin."))},
tls_(tls.allocateSlot()), runtime_(runtime), local_info_(local_info) {
tls_(tls.allocateSlot()), runtime_(runtime), local_info_(local_info),
time_source_(time_source) {

Upstream::ThreadLocalCluster* cluster = cm_.get(config.getString("collector_cluster"));
if (!cluster) {
Expand All @@ -76,7 +78,7 @@ Driver::Driver(const Json::Object& config, Upstream::ClusterManager& cluster_man
tls_->set([this, collector_endpoint, &random_generator, trace_id_128bit](
Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectSharedPtr {
TracerPtr tracer(new Tracer(local_info_.clusterName(), local_info_.address(), random_generator,
trace_id_128bit, cm_.timeSource()));
trace_id_128bit, time_source_));
tracer->setReporter(
ReporterImpl::NewInstance(std::ref(*this), std::ref(dispatcher), collector_endpoint));
return ThreadLocal::ThreadLocalObjectSharedPtr{new TlsTracer(std::move(tracer), *this)};
Expand Down
4 changes: 3 additions & 1 deletion source/extensions/tracers/zipkin/zipkin_tracer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ class Driver : public Tracing::Driver {
*/
Driver(const Json::Object& config, Upstream::ClusterManager& cluster_manager, Stats::Store& stats,
ThreadLocal::SlotAllocator& tls, Runtime::Loader& runtime,
const LocalInfo::LocalInfo& localinfo, Runtime::RandomGenerator& random_generator);
const LocalInfo::LocalInfo& localinfo, Runtime::RandomGenerator& random_generator,
TimeSource& time_source);

/**
* This function is inherited from the abstract Driver class.
Expand Down Expand Up @@ -130,6 +131,7 @@ class Driver : public Tracing::Driver {
ThreadLocal::SlotPtr tls_;
Runtime::Loader& runtime_;
const LocalInfo::LocalInfo& local_info_;
TimeSource& time_source_;
};

/**
Expand Down
4 changes: 2 additions & 2 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,8 @@ void InstanceImpl::initialize(Options& options,

if (bootstrap_.has_hds_config()) {
const auto& hds_config = bootstrap_.hds_config();
async_client_manager_ =
std::make_unique<Grpc::AsyncClientManagerImpl>(clusterManager(), thread_local_);
async_client_manager_ = std::make_unique<Grpc::AsyncClientManagerImpl>(
clusterManager(), thread_local_, time_source_);
hds_delegate_.reset(new Upstream::HdsDelegate(
bootstrap_.node(), stats(),
Config::Utility::factoryForGrpcApiConfigSource(*async_client_manager_, hds_config, stats())
Expand Down
1 change: 1 addition & 0 deletions test/common/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ envoy_cc_test(
"//test/mocks/tracing:tracing_mocks",
"//test/mocks/upstream:upstream_mocks",
"//test/proto:helloworld_proto_cc",
"//test/test_common:test_time_lib",
],
)

Expand Down
4 changes: 3 additions & 1 deletion test/common/grpc/async_client_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "test/mocks/tracing/mocks.h"
#include "test/mocks/upstream/mocks.h"
#include "test/proto/helloworld.pb.h"
#include "test/test_common/test_time.h"

#include "gmock/gmock.h"
#include "gtest/gtest.h"
Expand All @@ -24,14 +25,15 @@ class EnvoyAsyncClientImplTest : public testing::Test {
: method_descriptor_(helloworld::Greeter::descriptor()->FindMethodByName("SayHello")) {
envoy::api::v2::core::GrpcService config;
config.mutable_envoy_grpc()->set_cluster_name("test_cluster");
grpc_client_ = std::make_unique<AsyncClientImpl>(cm_, config);
grpc_client_ = std::make_unique<AsyncClientImpl>(cm_, config, test_time_.timeSource());
ON_CALL(cm_, httpAsyncClientForCluster("test_cluster")).WillByDefault(ReturnRef(http_client_));
}

const Protobuf::MethodDescriptor* method_descriptor_;
NiceMock<Http::MockAsyncClient> http_client_;
NiceMock<Upstream::MockClusterManager> cm_;
std::unique_ptr<AsyncClientImpl> grpc_client_;
DangerousDeprecatedTestTime test_time_;
};

// Validate that a failure in the HTTP client returns immediately with status
Expand Down
Loading

0 comments on commit c987b42

Please sign in to comment.