From 8bdebbfb936c533a822df9e96fd37be2cbdbab94 Mon Sep 17 00:00:00 2001 From: Lizan Zhou Date: Mon, 26 Aug 2019 16:07:39 -0700 Subject: [PATCH] accesslog: implement TCP gRPC access logger (#7941) Description: Initial implementation for TCP gRPC access logger. Risk Level: Low (extension only) Testing: integration test Docs Changes: Added Release Notes: Added Signed-off-by: Lizan Zhou --- api/envoy/config/accesslog/v2/als.proto | 1 - .../filter/accesslog/v2/accesslog.proto | 3 + api/envoy/data/accesslog/v2/accesslog.proto | 13 +- api/envoy/service/accesslog/v2/als.proto | 2 - docs/root/intro/version_history.rst | 1 + source/extensions/access_loggers/grpc/BUILD | 40 +++- .../access_loggers/grpc/config_utils.cc | 25 +++ .../access_loggers/grpc/config_utils.h | 18 ++ .../grpc/grpc_access_log_impl.cc | 18 +- .../grpc/grpc_access_log_impl.h | 23 ++- .../grpc/grpc_access_log_proto_descriptors.cc | 4 +- .../grpc/grpc_access_log_proto_descriptors.h | 4 +- .../grpc/grpc_access_log_utils.cc | 3 +- .../access_loggers/grpc/http_config.cc | 22 +- .../access_loggers/grpc/http_config.h | 2 - .../grpc/http_grpc_access_log_impl.cc | 4 +- .../access_loggers/grpc/tcp_config.cc | 51 +++++ .../access_loggers/grpc/tcp_config.h | 29 +++ .../grpc/tcp_grpc_access_log_impl.cc | 48 +++++ .../grpc/tcp_grpc_access_log_impl.h | 60 ++++++ .../access_loggers/well_known_names.h | 2 + source/extensions/extensions_build_config.bzl | 1 + test/extensions/access_loggers/grpc/BUILD | 17 ++ .../grpc/grpc_access_log_impl_test.cc | 15 +- .../grpc/http_grpc_access_log_impl_test.cc | 21 +- .../tcp_grpc_access_log_integration_test.cc | 189 ++++++++++++++++++ 26 files changed, 564 insertions(+), 52 deletions(-) create mode 100644 source/extensions/access_loggers/grpc/config_utils.cc create mode 100644 source/extensions/access_loggers/grpc/config_utils.h create mode 100644 source/extensions/access_loggers/grpc/tcp_config.cc create mode 100644 source/extensions/access_loggers/grpc/tcp_config.h create mode 100644 source/extensions/access_loggers/grpc/tcp_grpc_access_log_impl.cc create mode 100644 source/extensions/access_loggers/grpc/tcp_grpc_access_log_impl.h create mode 100644 test/extensions/access_loggers/grpc/tcp_grpc_access_log_integration_test.cc diff --git a/api/envoy/config/accesslog/v2/als.proto b/api/envoy/config/accesslog/v2/als.proto index a7291e4e9780..9d83ebfcfb91 100644 --- a/api/envoy/config/accesslog/v2/als.proto +++ b/api/envoy/config/accesslog/v2/als.proto @@ -38,7 +38,6 @@ message HttpGrpcAccessLogConfig { // Configuration for the built-in *envoy.tcp_grpc_access_log* type. This configuration will // populate *StreamAccessLogsMessage.tcp_logs*. -// [#not-implemented-hide:] message TcpGrpcAccessLogConfig { CommonGrpcAccessLogConfig common_config = 1 [(validate.rules).message.required = true]; } diff --git a/api/envoy/config/filter/accesslog/v2/accesslog.proto b/api/envoy/config/filter/accesslog/v2/accesslog.proto index 66d901c2d413..76fc4baf80c7 100644 --- a/api/envoy/config/filter/accesslog/v2/accesslog.proto +++ b/api/envoy/config/filter/accesslog/v2/accesslog.proto @@ -24,6 +24,7 @@ message AccessLog { // // #. "envoy.file_access_log" // #. "envoy.http_grpc_access_log" + // #. "envoy.tcp_grpc_access_log" string name = 1; // Filter which is used to determine if the access log needs to be written. @@ -36,6 +37,8 @@ message AccessLog { // ` // #. "envoy.http_grpc_access_log": :ref:`HttpGrpcAccessLogConfig // ` + // #. "envoy.tcp_grpc_access_log": :ref:`TcpGrpcAccessLogConfig + // ` oneof config_type { google.protobuf.Struct config = 3; diff --git a/api/envoy/data/accesslog/v2/accesslog.proto b/api/envoy/data/accesslog/v2/accesslog.proto index 1396c729f88e..7309d4a362a6 100644 --- a/api/envoy/data/accesslog/v2/accesslog.proto +++ b/api/envoy/data/accesslog/v2/accesslog.proto @@ -28,10 +28,12 @@ option (gogoproto.stable_marshaler_all) = true; // Fields describing *upstream* interaction will explicitly include ``upstream`` // in their name. -// [#not-implemented-hide:] message TCPAccessLogEntry { // Common properties shared by all Envoy access logs. AccessLogCommon common_properties = 1; + + // Properties of the TCP connection. + ConnectionProperties connection_properties = 2; } message HTTPAccessLogEntry { @@ -54,6 +56,15 @@ message HTTPAccessLogEntry { HTTPResponseProperties response = 4; } +// Defines fields for a connection +message ConnectionProperties { + // Number of bytes received from downstream. + uint64 received_bytes = 1; + + // Number of bytes sent to downstream. + uint64 sent_bytes = 2; +} + // Defines fields that are shared by all Envoy access logs. message AccessLogCommon { // [#not-implemented-hide:] diff --git a/api/envoy/service/accesslog/v2/als.proto b/api/envoy/service/accesslog/v2/als.proto index 1ee6ccd0094c..52788e0659c6 100644 --- a/api/envoy/service/accesslog/v2/als.proto +++ b/api/envoy/service/accesslog/v2/als.proto @@ -53,7 +53,6 @@ message StreamAccessLogsMessage { [(validate.rules).repeated .min_items = 1]; } - // [#not-implemented-hide:] // Wrapper for batches of TCP access log entries. message TCPAccessLogEntries { repeated envoy.data.accesslog.v2.TCPAccessLogEntry log_entry = 1 @@ -67,7 +66,6 @@ message StreamAccessLogsMessage { HTTPAccessLogEntries http_logs = 2; - // [#not-implemented-hide:] TCPAccessLogEntries tcp_logs = 3; } } diff --git a/docs/root/intro/version_history.rst b/docs/root/intro/version_history.rst index caf541e796d5..63fb953c060a 100644 --- a/docs/root/intro/version_history.rst +++ b/docs/root/intro/version_history.rst @@ -4,6 +4,7 @@ Version history 1.12.0 (pending) ================ * access log: added :ref:`buffering ` and :ref:`periodical flushing ` support to gRPC access logger. Defaults to 16KB buffer and flushing every 1 second. +* access log: gRPC Access Log Service (ALS) support added for :ref:`TCP access logs `. * admin: added ability to configure listener :ref:`socket options `. * admin: added config dump support for Secret Discovery Service :ref:`SecretConfigDump `. * api: added ::ref:`set_node_on_first_message_only ` option to omit the node identifier from the subsequent discovery requests on the same stream. diff --git a/source/extensions/access_loggers/grpc/BUILD b/source/extensions/access_loggers/grpc/BUILD index e0e705952699..e7ad7b299588 100644 --- a/source/extensions/access_loggers/grpc/BUILD +++ b/source/extensions/access_loggers/grpc/BUILD @@ -11,6 +11,18 @@ load( envoy_package() +envoy_cc_library( + name = "config_utils", + srcs = ["config_utils.cc"], + hdrs = ["config_utils.h"], + deps = [ + ":grpc_access_log_lib", + "//include/envoy/registry", + "//include/envoy/server:filter_config_interface", + "//include/envoy/singleton:instance_interface", + ], +) + envoy_cc_library( name = "grpc_access_log_lib", srcs = ["grpc_access_log_impl.cc"], @@ -18,7 +30,6 @@ envoy_cc_library( deps = [ "//include/envoy/grpc:async_client_interface", "//include/envoy/grpc:async_client_manager_interface", - "//include/envoy/singleton:instance_interface", "//include/envoy/thread_local:thread_local_interface", "//include/envoy/upstream:cluster_manager_interface", "//include/envoy/upstream:upstream_interface", @@ -54,6 +65,16 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "tcp_grpc_access_log_lib", + srcs = ["tcp_grpc_access_log_impl.cc"], + hdrs = ["tcp_grpc_access_log_impl.h"], + deps = [ + ":grpc_access_log_lib", + ":grpc_access_log_utils", + ], +) + envoy_cc_library( name = "grpc_access_log_proto_descriptors_lib", srcs = ["grpc_access_log_proto_descriptors.cc"], @@ -70,7 +91,7 @@ envoy_cc_library( srcs = ["http_config.cc"], hdrs = ["http_config.h"], deps = [ - "//include/envoy/registry", + ":config_utils", "//include/envoy/server:access_log_config_interface", "//source/common/common:assert_lib", "//source/common/protobuf", @@ -79,3 +100,18 @@ envoy_cc_library( "//source/extensions/access_loggers/grpc:http_grpc_access_log_lib", ], ) + +envoy_cc_library( + name = "tcp_config", + srcs = ["tcp_config.cc"], + hdrs = ["tcp_config.h"], + deps = [ + ":config_utils", + "//include/envoy/server:access_log_config_interface", + "//source/common/common:assert_lib", + "//source/common/protobuf", + "//source/extensions/access_loggers:well_known_names", + "//source/extensions/access_loggers/grpc:grpc_access_log_proto_descriptors_lib", + "//source/extensions/access_loggers/grpc:tcp_grpc_access_log_lib", + ], +) diff --git a/source/extensions/access_loggers/grpc/config_utils.cc b/source/extensions/access_loggers/grpc/config_utils.cc new file mode 100644 index 000000000000..5d2a648a0f5d --- /dev/null +++ b/source/extensions/access_loggers/grpc/config_utils.cc @@ -0,0 +1,25 @@ +#include "extensions/access_loggers/grpc/config_utils.h" + +#include "envoy/singleton/manager.h" + +namespace Envoy { +namespace Extensions { +namespace AccessLoggers { +namespace GrpcCommon { + +// Singleton registration via macro defined in envoy/singleton/manager.h +SINGLETON_MANAGER_REGISTRATION(grpc_access_logger_cache); + +std::shared_ptr +getGrpcAccessLoggerCacheSingleton(Server::Configuration::FactoryContext& context) { + return context.singletonManager().getTyped( + SINGLETON_MANAGER_REGISTERED_NAME(grpc_access_logger_cache), [&context] { + return std::make_shared( + context.clusterManager().grpcAsyncClientManager(), context.scope(), + context.threadLocal(), context.localInfo()); + }); +} +} // namespace GrpcCommon +} // namespace AccessLoggers +} // namespace Extensions +} // namespace Envoy \ No newline at end of file diff --git a/source/extensions/access_loggers/grpc/config_utils.h b/source/extensions/access_loggers/grpc/config_utils.h new file mode 100644 index 000000000000..f95b53f6a790 --- /dev/null +++ b/source/extensions/access_loggers/grpc/config_utils.h @@ -0,0 +1,18 @@ +#pragma once + +#include "envoy/server/filter_config.h" + +#include "extensions/access_loggers/grpc/grpc_access_log_impl.h" + +namespace Envoy { +namespace Extensions { +namespace AccessLoggers { +namespace GrpcCommon { + +GrpcAccessLoggerCacheSharedPtr +getGrpcAccessLoggerCacheSingleton(Server::Configuration::FactoryContext& context); + +} // namespace GrpcCommon +} // namespace AccessLoggers +} // namespace Extensions +} // namespace Envoy \ No newline at end of file diff --git a/source/extensions/access_loggers/grpc/grpc_access_log_impl.cc b/source/extensions/access_loggers/grpc/grpc_access_log_impl.cc index 38020326b7da..962e3a68084d 100644 --- a/source/extensions/access_loggers/grpc/grpc_access_log_impl.cc +++ b/source/extensions/access_loggers/grpc/grpc_access_log_impl.cc @@ -38,14 +38,22 @@ GrpcAccessLoggerImpl::GrpcAccessLoggerImpl(Grpc::RawAsyncClientPtr&& client, std void GrpcAccessLoggerImpl::log(envoy::data::accesslog::v2::HTTPAccessLogEntry&& entry) { approximate_message_size_bytes_ += entry.ByteSizeLong(); - message_.mutable_http_logs()->add_log_entry()->Swap(&entry); + message_.mutable_http_logs()->mutable_log_entry()->Add(std::move(entry)); + if (approximate_message_size_bytes_ >= buffer_size_bytes_) { + flush(); + } +} + +void GrpcAccessLoggerImpl::log(envoy::data::accesslog::v2::TCPAccessLogEntry&& entry) { + approximate_message_size_bytes_ += entry.ByteSizeLong(); + message_.mutable_tcp_logs()->mutable_log_entry()->Add(std::move(entry)); if (approximate_message_size_bytes_ >= buffer_size_bytes_) { flush(); } } void GrpcAccessLoggerImpl::flush() { - if (!message_.has_http_logs()) { + if (!message_.has_http_logs() && !message_.has_tcp_logs()) { // Nothing to flush. return; } @@ -88,11 +96,11 @@ GrpcAccessLoggerCacheImpl::GrpcAccessLoggerCacheImpl(Grpc::AsyncClientManager& a } GrpcAccessLoggerSharedPtr GrpcAccessLoggerCacheImpl::getOrCreateLogger( - const envoy::config::accesslog::v2::CommonGrpcAccessLogConfig& config) { + const envoy::config::accesslog::v2::CommonGrpcAccessLogConfig& config, + GrpcAccessLoggerType logger_type) { // TODO(euroelessar): Consider cleaning up loggers. auto& cache = tls_slot_->getTyped(); - // TODO(lizan): Include logger type in the hash - const std::size_t cache_key = MessageUtil::hash(config); + const auto cache_key = std::make_pair(MessageUtil::hash(config), logger_type); const auto it = cache.access_loggers_.find(cache_key); if (it != cache.access_loggers_.end()) { return it->second; diff --git a/source/extensions/access_loggers/grpc/grpc_access_log_impl.h b/source/extensions/access_loggers/grpc/grpc_access_log_impl.h index 71745adc54d1..8c254e47bca0 100644 --- a/source/extensions/access_loggers/grpc/grpc_access_log_impl.h +++ b/source/extensions/access_loggers/grpc/grpc_access_log_impl.h @@ -36,10 +36,18 @@ class GrpcAccessLogger { * @param entry supplies the access log to send. */ virtual void log(envoy::data::accesslog::v2::HTTPAccessLogEntry&& entry) PURE; + + /** + * Log tcp access entry. + * @param entry supplies the access log to send. + */ + virtual void log(envoy::data::accesslog::v2::TCPAccessLogEntry&& entry) PURE; }; using GrpcAccessLoggerSharedPtr = std::shared_ptr; +enum class GrpcAccessLoggerType { TCP, HTTP }; + /** * Interface for an access logger cache. The cache deals with threading and de-duplicates loggers * for the same configuration. @@ -54,7 +62,8 @@ class GrpcAccessLoggerCache { * @return GrpcAccessLoggerSharedPtr ready for logging requests. */ virtual GrpcAccessLoggerSharedPtr - getOrCreateLogger(const ::envoy::config::accesslog::v2::CommonGrpcAccessLogConfig& config) PURE; + getOrCreateLogger(const ::envoy::config::accesslog::v2::CommonGrpcAccessLogConfig& config, + GrpcAccessLoggerType logger_type) PURE; }; using GrpcAccessLoggerCacheSharedPtr = std::shared_ptr; @@ -66,7 +75,9 @@ class GrpcAccessLoggerImpl : public GrpcAccessLogger { uint64_t buffer_size_bytes, Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info); + // Extensions::AccessLoggers::GrpcCommon::GrpcAccessLogger void log(envoy::data::accesslog::v2::HTTPAccessLogEntry&& entry) override; + void log(envoy::data::accesslog::v2::TCPAccessLogEntry&& entry) override; private: struct LocalStream @@ -106,8 +117,9 @@ class GrpcAccessLoggerCacheImpl : public Singleton::Instance, public GrpcAccessL ThreadLocal::SlotAllocator& tls, const LocalInfo::LocalInfo& local_info); - GrpcAccessLoggerSharedPtr getOrCreateLogger( - const ::envoy::config::accesslog::v2::CommonGrpcAccessLogConfig& config) override; + GrpcAccessLoggerSharedPtr + getOrCreateLogger(const ::envoy::config::accesslog::v2::CommonGrpcAccessLogConfig& config, + GrpcAccessLoggerType logger_type) override; private: /** @@ -117,8 +129,9 @@ class GrpcAccessLoggerCacheImpl : public Singleton::Instance, public GrpcAccessL ThreadLocalCache(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {} Event::Dispatcher& dispatcher_; - // Access loggers indexed by the hash of logger's configuration. - absl::flat_hash_map access_loggers_; + // Access loggers indexed by the hash of logger's configuration and logger type. + absl::flat_hash_map, GrpcAccessLoggerSharedPtr> + access_loggers_; }; Grpc::AsyncClientManager& async_client_manager_; diff --git a/source/extensions/access_loggers/grpc/grpc_access_log_proto_descriptors.cc b/source/extensions/access_loggers/grpc/grpc_access_log_proto_descriptors.cc index 3b038045ee89..6936800be0ff 100644 --- a/source/extensions/access_loggers/grpc/grpc_access_log_proto_descriptors.cc +++ b/source/extensions/access_loggers/grpc/grpc_access_log_proto_descriptors.cc @@ -9,7 +9,7 @@ namespace Envoy { namespace Extensions { namespace AccessLoggers { -namespace HttpGrpc { +namespace GrpcCommon { void validateProtoDescriptors() { const auto method = "envoy.service.accesslog.v2.AccessLogService.StreamAccessLogs"; @@ -17,7 +17,7 @@ void validateProtoDescriptors() { RELEASE_ASSERT(Protobuf::DescriptorPool::generated_pool()->FindMethodByName(method) != nullptr, ""); }; -} // namespace HttpGrpc +} // namespace GrpcCommon } // namespace AccessLoggers } // namespace Extensions } // namespace Envoy diff --git a/source/extensions/access_loggers/grpc/grpc_access_log_proto_descriptors.h b/source/extensions/access_loggers/grpc/grpc_access_log_proto_descriptors.h index 62b70a387a7b..988723cfa2da 100644 --- a/source/extensions/access_loggers/grpc/grpc_access_log_proto_descriptors.h +++ b/source/extensions/access_loggers/grpc/grpc_access_log_proto_descriptors.h @@ -3,12 +3,12 @@ namespace Envoy { namespace Extensions { namespace AccessLoggers { -namespace HttpGrpc { +namespace GrpcCommon { // This function validates that the method descriptors for gRPC services and type descriptors that // are referenced in Any messages are available in the descriptor pool. void validateProtoDescriptors(); -} // namespace HttpGrpc +} // namespace GrpcCommon } // namespace AccessLoggers } // namespace Extensions } // namespace Envoy diff --git a/source/extensions/access_loggers/grpc/grpc_access_log_utils.cc b/source/extensions/access_loggers/grpc/grpc_access_log_utils.cc index 6bab3fd1e274..66cf3c0a7489 100644 --- a/source/extensions/access_loggers/grpc/grpc_access_log_utils.cc +++ b/source/extensions/access_loggers/grpc/grpc_access_log_utils.cc @@ -116,6 +116,7 @@ void Utility::responseFlagsToAccessLogResponseFlags( void Utility::extractCommonAccessLogProperties( envoy::data::accesslog::v2::AccessLogCommon& common_access_log, const StreamInfo::StreamInfo& stream_info) { + // TODO(mattklein123): Populate sample_rate field. if (stream_info.downstreamRemoteAddress() != nullptr) { Network::Utility::addressToProtobufAddress( *stream_info.downstreamRemoteAddress(), @@ -229,4 +230,4 @@ void Utility::extractCommonAccessLogProperties( } // namespace GrpcCommon } // namespace AccessLoggers } // namespace Extensions -} // namespace Envoy \ No newline at end of file +} // namespace Envoy diff --git a/source/extensions/access_loggers/grpc/http_config.cc b/source/extensions/access_loggers/grpc/http_config.cc index 1047beaa579a..e8f6992600cd 100644 --- a/source/extensions/access_loggers/grpc/http_config.cc +++ b/source/extensions/access_loggers/grpc/http_config.cc @@ -10,6 +10,7 @@ #include "common/grpc/async_client_impl.h" #include "common/protobuf/protobuf.h" +#include "extensions/access_loggers/grpc/config_utils.h" #include "extensions/access_loggers/grpc/grpc_access_log_proto_descriptors.h" #include "extensions/access_loggers/grpc/http_grpc_access_log_impl.h" #include "extensions/access_loggers/well_known_names.h" @@ -19,32 +20,23 @@ namespace Extensions { namespace AccessLoggers { namespace HttpGrpc { -// Singleton registration via macro defined in envoy/singleton/manager.h -SINGLETON_MANAGER_REGISTRATION(grpc_access_logger_cache); - AccessLog::InstanceSharedPtr HttpGrpcAccessLogFactory::createAccessLogInstance(const Protobuf::Message& config, AccessLog::FilterPtr&& filter, Server::Configuration::FactoryContext& context) { - validateProtoDescriptors(); + GrpcCommon::validateProtoDescriptors(); const auto& proto_config = MessageUtil::downcastAndValidate< const envoy::config::accesslog::v2::HttpGrpcAccessLogConfig&>( config, context.messageValidationVisitor()); - std::shared_ptr grpc_access_logger_cache = - context.singletonManager().getTyped( - SINGLETON_MANAGER_REGISTERED_NAME(grpc_access_logger_cache), [&context] { - return std::make_shared( - context.clusterManager().grpcAsyncClientManager(), context.scope(), - context.threadLocal(), context.localInfo()); - }); - - return std::make_shared(std::move(filter), proto_config, context.threadLocal(), - grpc_access_logger_cache); + + return std::make_shared( + std::move(filter), proto_config, context.threadLocal(), + GrpcCommon::getGrpcAccessLoggerCacheSingleton(context)); } ProtobufTypes::MessagePtr HttpGrpcAccessLogFactory::createEmptyConfigProto() { - return ProtobufTypes::MessagePtr{new envoy::config::accesslog::v2::HttpGrpcAccessLogConfig()}; + return std::make_unique(); } std::string HttpGrpcAccessLogFactory::name() const { return AccessLogNames::get().HttpGrpc; } diff --git a/source/extensions/access_loggers/grpc/http_config.h b/source/extensions/access_loggers/grpc/http_config.h index 9e046ac39218..c88a3a5ac62d 100644 --- a/source/extensions/access_loggers/grpc/http_config.h +++ b/source/extensions/access_loggers/grpc/http_config.h @@ -23,8 +23,6 @@ class HttpGrpcAccessLogFactory : public Server::Configuration::AccessLogInstance std::string name() const override; }; -// TODO(mattklein123): Add TCP access log. - } // namespace HttpGrpc } // namespace AccessLoggers } // namespace Extensions diff --git a/source/extensions/access_loggers/grpc/http_grpc_access_log_impl.cc b/source/extensions/access_loggers/grpc/http_grpc_access_log_impl.cc index 07b91a0a894a..e5bad26a3efa 100644 --- a/source/extensions/access_loggers/grpc/http_grpc_access_log_impl.cc +++ b/source/extensions/access_loggers/grpc/http_grpc_access_log_impl.cc @@ -34,8 +34,8 @@ HttpGrpcAccessLog::HttpGrpcAccessLog(AccessLog::FilterPtr&& filter, } tls_slot_->set([this](Event::Dispatcher&) { - return std::make_shared( - access_logger_cache_->getOrCreateLogger(config_.common_config())); + return std::make_shared(access_logger_cache_->getOrCreateLogger( + config_.common_config(), GrpcCommon::GrpcAccessLoggerType::HTTP)); }); } diff --git a/source/extensions/access_loggers/grpc/tcp_config.cc b/source/extensions/access_loggers/grpc/tcp_config.cc new file mode 100644 index 000000000000..b8c053ae0c44 --- /dev/null +++ b/source/extensions/access_loggers/grpc/tcp_config.cc @@ -0,0 +1,51 @@ +#include "extensions/access_loggers/grpc/tcp_config.h" + +#include "envoy/config/accesslog/v2/als.pb.validate.h" +#include "envoy/config/filter/accesslog/v2/accesslog.pb.validate.h" +#include "envoy/registry/registry.h" +#include "envoy/server/filter_config.h" + +#include "common/common/assert.h" +#include "common/common/macros.h" +#include "common/grpc/async_client_impl.h" +#include "common/protobuf/protobuf.h" + +#include "extensions/access_loggers/grpc/config_utils.h" +#include "extensions/access_loggers/grpc/grpc_access_log_proto_descriptors.h" +#include "extensions/access_loggers/grpc/tcp_grpc_access_log_impl.h" +#include "extensions/access_loggers/well_known_names.h" + +namespace Envoy { +namespace Extensions { +namespace AccessLoggers { +namespace TcpGrpc { + +AccessLog::InstanceSharedPtr +TcpGrpcAccessLogFactory::createAccessLogInstance(const Protobuf::Message& config, + AccessLog::FilterPtr&& filter, + Server::Configuration::FactoryContext& context) { + GrpcCommon::validateProtoDescriptors(); + + const auto& proto_config = + MessageUtil::downcastAndValidate( + config, context.messageValidationVisitor()); + + return std::make_shared(std::move(filter), proto_config, context.threadLocal(), + GrpcCommon::getGrpcAccessLoggerCacheSingleton(context)); +} + +ProtobufTypes::MessagePtr TcpGrpcAccessLogFactory::createEmptyConfigProto() { + return std::make_unique(); +} + +std::string TcpGrpcAccessLogFactory::name() const { return AccessLogNames::get().TcpGrpc; } + +/** + * Static registration for the TCP gRPC access log. @see RegisterFactory. + */ +REGISTER_FACTORY(TcpGrpcAccessLogFactory, Server::Configuration::AccessLogInstanceFactory); + +} // namespace TcpGrpc +} // namespace AccessLoggers +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/access_loggers/grpc/tcp_config.h b/source/extensions/access_loggers/grpc/tcp_config.h new file mode 100644 index 000000000000..39bc986146b9 --- /dev/null +++ b/source/extensions/access_loggers/grpc/tcp_config.h @@ -0,0 +1,29 @@ +#pragma once + +#include + +#include "envoy/server/access_log_config.h" + +namespace Envoy { +namespace Extensions { +namespace AccessLoggers { +namespace TcpGrpc { + +/** + * Config registration for the TCP gRPC access log. @see AccessLogInstanceFactory. + */ +class TcpGrpcAccessLogFactory : public Server::Configuration::AccessLogInstanceFactory { +public: + AccessLog::InstanceSharedPtr + createAccessLogInstance(const Protobuf::Message& config, AccessLog::FilterPtr&& filter, + Server::Configuration::FactoryContext& context) override; + + ProtobufTypes::MessagePtr createEmptyConfigProto() override; + + std::string name() const override; +}; + +} // namespace TcpGrpc +} // namespace AccessLoggers +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/access_loggers/grpc/tcp_grpc_access_log_impl.cc b/source/extensions/access_loggers/grpc/tcp_grpc_access_log_impl.cc new file mode 100644 index 000000000000..c6c8a1bb5a58 --- /dev/null +++ b/source/extensions/access_loggers/grpc/tcp_grpc_access_log_impl.cc @@ -0,0 +1,48 @@ +#include "extensions/access_loggers/grpc/tcp_grpc_access_log_impl.h" + +#include "common/common/assert.h" +#include "common/network/utility.h" +#include "common/stream_info/utility.h" + +#include "extensions/access_loggers/grpc/grpc_access_log_utils.h" + +namespace Envoy { +namespace Extensions { +namespace AccessLoggers { +namespace TcpGrpc { + +TcpGrpcAccessLog::ThreadLocalLogger::ThreadLocalLogger(GrpcCommon::GrpcAccessLoggerSharedPtr logger) + : logger_(std::move(logger)) {} + +TcpGrpcAccessLog::TcpGrpcAccessLog(AccessLog::FilterPtr&& filter, + envoy::config::accesslog::v2::TcpGrpcAccessLogConfig config, + ThreadLocal::SlotAllocator& tls, + GrpcCommon::GrpcAccessLoggerCacheSharedPtr access_logger_cache) + : Common::ImplBase(std::move(filter)), config_(std::move(config)), + tls_slot_(tls.allocateSlot()), access_logger_cache_(std::move(access_logger_cache)) { + tls_slot_->set([this](Event::Dispatcher&) { + return std::make_shared(access_logger_cache_->getOrCreateLogger( + config_.common_config(), GrpcCommon::GrpcAccessLoggerType::TCP)); + }); +} + +void TcpGrpcAccessLog::emitLog(const Http::HeaderMap&, const Http::HeaderMap&, + const Http::HeaderMap&, const StreamInfo::StreamInfo& stream_info) { + // Common log properties. + envoy::data::accesslog::v2::TCPAccessLogEntry log_entry; + GrpcCommon::Utility::extractCommonAccessLogProperties(*log_entry.mutable_common_properties(), + stream_info); + + envoy::data::accesslog::v2::ConnectionProperties& connection_properties = + *log_entry.mutable_connection_properties(); + connection_properties.set_received_bytes(stream_info.bytesReceived()); + connection_properties.set_sent_bytes(stream_info.bytesSent()); + + // request_properties->set_request_body_bytes(stream_info.bytesReceived()); + tls_slot_->getTyped().logger_->log(std::move(log_entry)); +} + +} // namespace TcpGrpc +} // namespace AccessLoggers +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/access_loggers/grpc/tcp_grpc_access_log_impl.h b/source/extensions/access_loggers/grpc/tcp_grpc_access_log_impl.h new file mode 100644 index 000000000000..115c8a467719 --- /dev/null +++ b/source/extensions/access_loggers/grpc/tcp_grpc_access_log_impl.h @@ -0,0 +1,60 @@ +#pragma once + +#include +#include + +#include "envoy/config/accesslog/v2/als.pb.h" +#include "envoy/config/filter/accesslog/v2/accesslog.pb.h" +#include "envoy/grpc/async_client.h" +#include "envoy/grpc/async_client_manager.h" +#include "envoy/local_info/local_info.h" +#include "envoy/service/accesslog/v2/als.pb.h" +#include "envoy/singleton/instance.h" +#include "envoy/thread_local/thread_local.h" + +#include "common/grpc/typed_async_client.h" + +#include "extensions/access_loggers/common/access_log_base.h" +#include "extensions/access_loggers/grpc/grpc_access_log_impl.h" + +namespace Envoy { +namespace Extensions { +namespace AccessLoggers { +namespace TcpGrpc { + +// TODO(mattklein123): Stats + +/** + * Access log Instance that streams TCP logs over gRPC. + */ +class TcpGrpcAccessLog : public Common::ImplBase { +public: + TcpGrpcAccessLog(AccessLog::FilterPtr&& filter, + envoy::config::accesslog::v2::TcpGrpcAccessLogConfig config, + ThreadLocal::SlotAllocator& tls, + GrpcCommon::GrpcAccessLoggerCacheSharedPtr access_logger_cache); + +private: + /** + * Per-thread cached logger. + */ + struct ThreadLocalLogger : public ThreadLocal::ThreadLocalObject { + ThreadLocalLogger(GrpcCommon::GrpcAccessLoggerSharedPtr logger); + + const GrpcCommon::GrpcAccessLoggerSharedPtr logger_; + }; + + // Common::ImplBase + void emitLog(const Http::HeaderMap& request_headers, const Http::HeaderMap& response_headers, + const Http::HeaderMap& response_trailers, + const StreamInfo::StreamInfo& stream_info) override; + + const envoy::config::accesslog::v2::TcpGrpcAccessLogConfig config_; + const ThreadLocal::SlotPtr tls_slot_; + const GrpcCommon::GrpcAccessLoggerCacheSharedPtr access_logger_cache_; +}; + +} // namespace TcpGrpc +} // namespace AccessLoggers +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/access_loggers/well_known_names.h b/source/extensions/access_loggers/well_known_names.h index 7c5e7576c1df..dc55f536bfad 100644 --- a/source/extensions/access_loggers/well_known_names.h +++ b/source/extensions/access_loggers/well_known_names.h @@ -18,6 +18,8 @@ class AccessLogNameValues { const std::string File = "envoy.file_access_log"; // HTTP gRPC access log const std::string HttpGrpc = "envoy.http_grpc_access_log"; + // TCP gRPC access log + const std::string TcpGrpc = "envoy.tcp_grpc_access_log"; }; using AccessLogNames = ConstSingleton; diff --git a/source/extensions/extensions_build_config.bzl b/source/extensions/extensions_build_config.bzl index f8b1ccc5158f..5366f7ba55fb 100644 --- a/source/extensions/extensions_build_config.bzl +++ b/source/extensions/extensions_build_config.bzl @@ -6,6 +6,7 @@ EXTENSIONS = { "envoy.access_loggers.file": "//source/extensions/access_loggers/file:config", "envoy.access_loggers.http_grpc": "//source/extensions/access_loggers/grpc:http_config", + "envoy.access_loggers.tcp_grpc": "//source/extensions/access_loggers/grpc:tcp_config", # # Clusters diff --git a/test/extensions/access_loggers/grpc/BUILD b/test/extensions/access_loggers/grpc/BUILD index 9e2c7b46240f..652fc010d578 100644 --- a/test/extensions/access_loggers/grpc/BUILD +++ b/test/extensions/access_loggers/grpc/BUILD @@ -77,3 +77,20 @@ envoy_extension_cc_test( "//test/test_common:utility_lib", ], ) + +envoy_extension_cc_test( + name = "tcp_grpc_access_log_integration_test", + srcs = ["tcp_grpc_access_log_integration_test.cc"], + extension_name = "envoy.access_loggers.http_grpc", + deps = [ + "//source/common/buffer:zero_copy_input_stream_lib", + "//source/common/grpc:codec_lib", + "//source/common/grpc:common_lib", + "//source/extensions/access_loggers/grpc:http_config", + "//source/extensions/access_loggers/grpc:tcp_config", + "//source/extensions/filters/network/tcp_proxy:config", + "//test/common/grpc:grpc_client_integration_lib", + "//test/integration:http_integration_lib", + "//test/test_common:utility_lib", + ], +) diff --git a/test/extensions/access_loggers/grpc/grpc_access_log_impl_test.cc b/test/extensions/access_loggers/grpc/grpc_access_log_impl_test.cc index 83ba234f7973..07875b008ddf 100644 --- a/test/extensions/access_loggers/grpc/grpc_access_log_impl_test.cc +++ b/test/extensions/access_loggers/grpc/grpc_access_log_impl_test.cc @@ -271,21 +271,26 @@ TEST_F(GrpcAccessLoggerCacheImplTest, Deduplication) { config.mutable_grpc_service()->mutable_envoy_grpc()->set_cluster_name("cluster-1"); expectClientCreation(); - GrpcAccessLoggerSharedPtr logger1 = logger_cache_->getOrCreateLogger(config); - EXPECT_EQ(logger1, logger_cache_->getOrCreateLogger(config)); + GrpcAccessLoggerSharedPtr logger1 = + logger_cache_->getOrCreateLogger(config, GrpcAccessLoggerType::HTTP); + EXPECT_EQ(logger1, logger_cache_->getOrCreateLogger(config, GrpcAccessLoggerType::HTTP)); + + // Do not deduplicate different types of logger + expectClientCreation(); + EXPECT_NE(logger1, logger_cache_->getOrCreateLogger(config, GrpcAccessLoggerType::TCP)); // Changing log name leads to another logger. config.set_log_name("log-2"); expectClientCreation(); - EXPECT_NE(logger1, logger_cache_->getOrCreateLogger(config)); + EXPECT_NE(logger1, logger_cache_->getOrCreateLogger(config, GrpcAccessLoggerType::HTTP)); config.set_log_name("log-1"); - EXPECT_EQ(logger1, logger_cache_->getOrCreateLogger(config)); + EXPECT_EQ(logger1, logger_cache_->getOrCreateLogger(config, GrpcAccessLoggerType::HTTP)); // Changing cluster name leads to another logger. config.mutable_grpc_service()->mutable_envoy_grpc()->set_cluster_name("cluster-2"); expectClientCreation(); - EXPECT_NE(logger1, logger_cache_->getOrCreateLogger(config)); + EXPECT_NE(logger1, logger_cache_->getOrCreateLogger(config, GrpcAccessLoggerType::HTTP)); } } // namespace diff --git a/test/extensions/access_loggers/grpc/http_grpc_access_log_impl_test.cc b/test/extensions/access_loggers/grpc/http_grpc_access_log_impl_test.cc index 22e1f7cd59c0..893d23d61243 100644 --- a/test/extensions/access_loggers/grpc/http_grpc_access_log_impl_test.cc +++ b/test/extensions/access_loggers/grpc/http_grpc_access_log_impl_test.cc @@ -14,6 +14,7 @@ using namespace std::chrono_literals; using testing::_; +using testing::An; using testing::InSequence; using testing::Invoke; using testing::NiceMock; @@ -25,18 +26,22 @@ namespace AccessLoggers { namespace HttpGrpc { namespace { +using envoy::data::accesslog::v2::HTTPAccessLogEntry; + class MockGrpcAccessLogger : public GrpcCommon::GrpcAccessLogger { public: // GrpcAccessLogger - MOCK_METHOD1(log, void(envoy::data::accesslog::v2::HTTPAccessLogEntry&& entry)); + MOCK_METHOD1(log, void(HTTPAccessLogEntry&& entry)); + MOCK_METHOD1(log, void(envoy::data::accesslog::v2::TCPAccessLogEntry&& entry)); }; class MockGrpcAccessLoggerCache : public GrpcCommon::GrpcAccessLoggerCache { public: // GrpcAccessLoggerCache - MOCK_METHOD1(getOrCreateLogger, + MOCK_METHOD2(getOrCreateLogger, GrpcCommon::GrpcAccessLoggerSharedPtr( - const ::envoy::config::accesslog::v2::CommonGrpcAccessLogConfig& config)); + const ::envoy::config::accesslog::v2::CommonGrpcAccessLogConfig& config, + GrpcCommon::GrpcAccessLoggerType logger_type)); }; class HttpGrpcAccessLogTest : public testing::Test { @@ -44,9 +49,11 @@ class HttpGrpcAccessLogTest : public testing::Test { void init() { ON_CALL(*filter_, evaluate(_, _, _, _)).WillByDefault(Return(true)); config_.mutable_common_config()->set_log_name("hello_log"); - EXPECT_CALL(*logger_cache_, getOrCreateLogger(_)) - .WillOnce([this](const ::envoy::config::accesslog::v2::CommonGrpcAccessLogConfig& config) { + EXPECT_CALL(*logger_cache_, getOrCreateLogger(_, _)) + .WillOnce([this](const ::envoy::config::accesslog::v2::CommonGrpcAccessLogConfig& config, + GrpcCommon::GrpcAccessLoggerType logger_type) { EXPECT_EQ(config.DebugString(), config_.common_config().DebugString()); + EXPECT_EQ(GrpcCommon::GrpcAccessLoggerType::HTTP, logger_type); return logger_; }); access_log_ = std::make_unique(AccessLog::FilterPtr{filter_}, config_, tls_, @@ -58,9 +65,9 @@ class HttpGrpcAccessLogTest : public testing::Test { init(); } - envoy::data::accesslog::v2::HTTPAccessLogEntry expected_log_entry; + HTTPAccessLogEntry expected_log_entry; TestUtility::loadFromYaml(expected_log_entry_yaml, expected_log_entry); - EXPECT_CALL(*logger_, log(_)) + EXPECT_CALL(*logger_, log(An())) .WillOnce( Invoke([expected_log_entry](envoy::data::accesslog::v2::HTTPAccessLogEntry&& entry) { EXPECT_EQ(entry.DebugString(), expected_log_entry.DebugString()); diff --git a/test/extensions/access_loggers/grpc/tcp_grpc_access_log_integration_test.cc b/test/extensions/access_loggers/grpc/tcp_grpc_access_log_integration_test.cc new file mode 100644 index 000000000000..8ad6bbe7bb68 --- /dev/null +++ b/test/extensions/access_loggers/grpc/tcp_grpc_access_log_integration_test.cc @@ -0,0 +1,189 @@ +#include "envoy/config/accesslog/v2/als.pb.h" +#include "envoy/config/filter/network/tcp_proxy/v2/tcp_proxy.pb.validate.h" +#include "envoy/service/accesslog/v2/als.pb.h" + +#include "common/buffer/zero_copy_input_stream_impl.h" +#include "common/common/version.h" +#include "common/grpc/codec.h" +#include "common/grpc/common.h" + +#include "test/common/grpc/grpc_client_integration.h" +#include "test/integration/http_integration.h" +#include "test/test_common/utility.h" + +#include "gtest/gtest.h" + +using testing::AssertionResult; + +namespace Envoy { +namespace { + +void clearPort(envoy::api::v2::core::Address& address) { + address.mutable_socket_address()->clear_port_specifier(); +} + +class TcpGrpcAccessLogIntegrationTest : public Grpc::GrpcClientIntegrationParamTest, + public BaseIntegrationTest { +public: + TcpGrpcAccessLogIntegrationTest() + : BaseIntegrationTest(ipVersion(), ConfigHelper::TCP_PROXY_CONFIG) { + enable_half_close_ = true; + } + + ~TcpGrpcAccessLogIntegrationTest() override { + test_server_.reset(); + fake_upstreams_.clear(); + } + + void createUpstreams() override { + BaseIntegrationTest::createUpstreams(); + fake_upstreams_.emplace_back( + new FakeUpstream(0, FakeHttpConnection::Type::HTTP2, version_, timeSystem())); + } + + void initialize() override { + config_helper_.renameListener("tcp_proxy"); + config_helper_.addConfigModifier([](envoy::config::bootstrap::v2::Bootstrap& bootstrap) { + auto* accesslog_cluster = bootstrap.mutable_static_resources()->add_clusters(); + accesslog_cluster->MergeFrom(bootstrap.static_resources().clusters()[0]); + accesslog_cluster->set_name("accesslog"); + accesslog_cluster->mutable_http2_protocol_options(); + }); + + config_helper_.addConfigModifier([this](envoy::config::bootstrap::v2::Bootstrap& bootstrap) { + auto* listener = bootstrap.mutable_static_resources()->mutable_listeners(0); + auto* filter_chain = listener->mutable_filter_chains(0); + auto* config_blob = filter_chain->mutable_filters(0)->mutable_config(); + + envoy::config::filter::network::tcp_proxy::v2::TcpProxy tcp_proxy_config; + TestUtility::jsonConvert(*config_blob, tcp_proxy_config); + + auto* access_log = tcp_proxy_config.add_access_log(); + access_log->set_name("envoy.tcp_grpc_access_log"); + envoy::config::accesslog::v2::TcpGrpcAccessLogConfig access_log_config; + auto* common_config = access_log_config.mutable_common_config(); + common_config->set_log_name("foo"); + setGrpcService(*common_config->mutable_grpc_service(), "accesslog", + fake_upstreams_.back()->localAddress()); + TestUtility::jsonConvert(access_log_config, *access_log->mutable_config()); + + TestUtility::jsonConvert(tcp_proxy_config, *config_blob); + }); + BaseIntegrationTest::initialize(); + } + + ABSL_MUST_USE_RESULT + AssertionResult waitForAccessLogConnection() { + return fake_upstreams_[1]->waitForHttpConnection(*dispatcher_, fake_access_log_connection_); + } + + ABSL_MUST_USE_RESULT + AssertionResult waitForAccessLogStream() { + return fake_access_log_connection_->waitForNewStream(*dispatcher_, access_log_request_); + } + + ABSL_MUST_USE_RESULT + AssertionResult waitForAccessLogRequest(const std::string& expected_request_msg_yaml) { + envoy::service::accesslog::v2::StreamAccessLogsMessage request_msg; + VERIFY_ASSERTION(access_log_request_->waitForGrpcMessage(*dispatcher_, request_msg)); + EXPECT_EQ("POST", access_log_request_->headers().Method()->value().getStringView()); + EXPECT_EQ("/envoy.service.accesslog.v2.AccessLogService/StreamAccessLogs", + access_log_request_->headers().Path()->value().getStringView()); + EXPECT_EQ("application/grpc", + access_log_request_->headers().ContentType()->value().getStringView()); + + envoy::service::accesslog::v2::StreamAccessLogsMessage expected_request_msg; + TestUtility::loadFromYaml(expected_request_msg_yaml, expected_request_msg); + + // Clear fields which are not deterministic. + auto* log_entry = request_msg.mutable_tcp_logs()->mutable_log_entry(0); + clearPort(*log_entry->mutable_common_properties()->mutable_downstream_remote_address()); + clearPort(*log_entry->mutable_common_properties()->mutable_downstream_local_address()); + clearPort(*log_entry->mutable_common_properties()->mutable_upstream_remote_address()); + clearPort(*log_entry->mutable_common_properties()->mutable_upstream_local_address()); + log_entry->mutable_common_properties()->clear_start_time(); + log_entry->mutable_common_properties()->clear_time_to_last_rx_byte(); + log_entry->mutable_common_properties()->clear_time_to_first_downstream_tx_byte(); + log_entry->mutable_common_properties()->clear_time_to_last_downstream_tx_byte(); + EXPECT_EQ(request_msg.DebugString(), expected_request_msg.DebugString()); + + return AssertionSuccess(); + } + + void cleanup() { + if (fake_access_log_connection_ != nullptr) { + AssertionResult result = fake_access_log_connection_->close(); + RELEASE_ASSERT(result, result.message()); + result = fake_access_log_connection_->waitForDisconnect(); + RELEASE_ASSERT(result, result.message()); + fake_access_log_connection_ = nullptr; + } + } + + FakeHttpConnectionPtr fake_access_log_connection_; + FakeStreamPtr access_log_request_; +}; + +INSTANTIATE_TEST_SUITE_P(IpVersionsCientType, TcpGrpcAccessLogIntegrationTest, + GRPC_CLIENT_INTEGRATION_PARAMS); + +// Test a basic full access logging flow. +TEST_P(TcpGrpcAccessLogIntegrationTest, BasicAccessLogFlow) { + initialize(); + + IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy")); + FakeRawConnectionPtr fake_upstream_connection; + ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection)); + + ASSERT_TRUE(fake_upstream_connection->write("hello")); + tcp_client->waitForData("hello"); + tcp_client->write("bar", false); + + ASSERT_TRUE(fake_upstream_connection->write("", true)); + tcp_client->waitForHalfClose(); + tcp_client->write("", true); + ASSERT_TRUE(fake_upstream_connection->waitForHalfClose()); + ASSERT_TRUE(fake_upstream_connection->waitForDisconnect()); + + ASSERT_TRUE(waitForAccessLogConnection()); + ASSERT_TRUE(waitForAccessLogStream()); + ASSERT_TRUE(waitForAccessLogRequest( + fmt::format(R"EOF( +identifier: + node: + id: node_name + cluster: cluster_name + locality: + zone: zone_name + build_version: {} + log_name: foo +tcp_logs: + log_entry: + common_properties: + downstream_remote_address: + socket_address: + address: {} + downstream_local_address: + socket_address: + address: {} + upstream_remote_address: + socket_address: + address: {} + upstream_local_address: + socket_address: + address: {} + upstream_cluster: cluster_0 + connection_properties: + received_bytes: 3 + sent_bytes: 5 +)EOF", + VersionInfo::version(), Network::Test::getLoopbackAddressString(ipVersion()), + Network::Test::getLoopbackAddressString(ipVersion()), + Network::Test::getLoopbackAddressString(ipVersion()), + Network::Test::getLoopbackAddressString(ipVersion())))); + + cleanup(); +} + +} // namespace +} // namespace Envoy