Skip to content

Commit

Permalink
accesslog: implement TCP gRPC access logger (#7941)
Browse files Browse the repository at this point in the history
Description:
Initial implementation for TCP gRPC access logger.

Risk Level: Low (extension only)
Testing: integration test
Docs Changes: Added
Release Notes: Added

Signed-off-by: Lizan Zhou <lizan@tetrate.io>
  • Loading branch information
lizan authored Aug 26, 2019
1 parent d39bd81 commit 8bdebbf
Show file tree
Hide file tree
Showing 26 changed files with 564 additions and 52 deletions.
1 change: 0 additions & 1 deletion api/envoy/config/accesslog/v2/als.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ message HttpGrpcAccessLogConfig {

// Configuration for the built-in *envoy.tcp_grpc_access_log* type. This configuration will
// populate *StreamAccessLogsMessage.tcp_logs*.
// [#not-implemented-hide:]
message TcpGrpcAccessLogConfig {
CommonGrpcAccessLogConfig common_config = 1 [(validate.rules).message.required = true];
}
Expand Down
3 changes: 3 additions & 0 deletions api/envoy/config/filter/accesslog/v2/accesslog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ message AccessLog {
//
// #. "envoy.file_access_log"
// #. "envoy.http_grpc_access_log"
// #. "envoy.tcp_grpc_access_log"
string name = 1;

// Filter which is used to determine if the access log needs to be written.
Expand All @@ -36,6 +37,8 @@ message AccessLog {
// <envoy_api_msg_config.accesslog.v2.FileAccessLog>`
// #. "envoy.http_grpc_access_log": :ref:`HttpGrpcAccessLogConfig
// <envoy_api_msg_config.accesslog.v2.HttpGrpcAccessLogConfig>`
// #. "envoy.tcp_grpc_access_log": :ref:`TcpGrpcAccessLogConfig
// <envoy_api_msg_config.accesslog.v2.TcpGrpcAccessLogConfig>`
oneof config_type {
google.protobuf.Struct config = 3;

Expand Down
13 changes: 12 additions & 1 deletion api/envoy/data/accesslog/v2/accesslog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ option (gogoproto.stable_marshaler_all) = true;
// Fields describing *upstream* interaction will explicitly include ``upstream``
// in their name.

// [#not-implemented-hide:]
message TCPAccessLogEntry {
// Common properties shared by all Envoy access logs.
AccessLogCommon common_properties = 1;

// Properties of the TCP connection.
ConnectionProperties connection_properties = 2;
}

message HTTPAccessLogEntry {
Expand All @@ -54,6 +56,15 @@ message HTTPAccessLogEntry {
HTTPResponseProperties response = 4;
}

// Defines fields for a connection
message ConnectionProperties {
// Number of bytes received from downstream.
uint64 received_bytes = 1;

// Number of bytes sent to downstream.
uint64 sent_bytes = 2;
}

// Defines fields that are shared by all Envoy access logs.
message AccessLogCommon {
// [#not-implemented-hide:]
Expand Down
2 changes: 0 additions & 2 deletions api/envoy/service/accesslog/v2/als.proto
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ message StreamAccessLogsMessage {
[(validate.rules).repeated .min_items = 1];
}

// [#not-implemented-hide:]
// Wrapper for batches of TCP access log entries.
message TCPAccessLogEntries {
repeated envoy.data.accesslog.v2.TCPAccessLogEntry log_entry = 1
Expand All @@ -67,7 +66,6 @@ message StreamAccessLogsMessage {

HTTPAccessLogEntries http_logs = 2;

// [#not-implemented-hide:]
TCPAccessLogEntries tcp_logs = 3;
}
}
1 change: 1 addition & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Version history
1.12.0 (pending)
================
* access log: added :ref:`buffering <envoy_api_field_config.accesslog.v2.CommonGrpcAccessLogConfig.buffer_size_bytes>` and :ref:`periodical flushing <envoy_api_field_config.accesslog.v2.CommonGrpcAccessLogConfig.buffer_flush_interval>` support to gRPC access logger. Defaults to 16KB buffer and flushing every 1 second.
* access log: gRPC Access Log Service (ALS) support added for :ref:`TCP access logs <envoy_api_msg_config.accesslog.v2.TcpGrpcAccessLogConfig>`.
* admin: added ability to configure listener :ref:`socket options <envoy_api_field_config.bootstrap.v2.Admin.socket_options>`.
* admin: added config dump support for Secret Discovery Service :ref:`SecretConfigDump <envoy_api_msg_admin.v2alpha.SecretsConfigDump>`.
* api: added ::ref:`set_node_on_first_message_only <envoy_api_field_core.ApiConfigSource.set_node_on_first_message_only>` option to omit the node identifier from the subsequent discovery requests on the same stream.
Expand Down
40 changes: 38 additions & 2 deletions source/extensions/access_loggers/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,25 @@ load(

envoy_package()

envoy_cc_library(
name = "config_utils",
srcs = ["config_utils.cc"],
hdrs = ["config_utils.h"],
deps = [
":grpc_access_log_lib",
"//include/envoy/registry",
"//include/envoy/server:filter_config_interface",
"//include/envoy/singleton:instance_interface",
],
)

envoy_cc_library(
name = "grpc_access_log_lib",
srcs = ["grpc_access_log_impl.cc"],
hdrs = ["grpc_access_log_impl.h"],
deps = [
"//include/envoy/grpc:async_client_interface",
"//include/envoy/grpc:async_client_manager_interface",
"//include/envoy/singleton:instance_interface",
"//include/envoy/thread_local:thread_local_interface",
"//include/envoy/upstream:cluster_manager_interface",
"//include/envoy/upstream:upstream_interface",
Expand Down Expand Up @@ -54,6 +65,16 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "tcp_grpc_access_log_lib",
srcs = ["tcp_grpc_access_log_impl.cc"],
hdrs = ["tcp_grpc_access_log_impl.h"],
deps = [
":grpc_access_log_lib",
":grpc_access_log_utils",
],
)

envoy_cc_library(
name = "grpc_access_log_proto_descriptors_lib",
srcs = ["grpc_access_log_proto_descriptors.cc"],
Expand All @@ -70,7 +91,7 @@ envoy_cc_library(
srcs = ["http_config.cc"],
hdrs = ["http_config.h"],
deps = [
"//include/envoy/registry",
":config_utils",
"//include/envoy/server:access_log_config_interface",
"//source/common/common:assert_lib",
"//source/common/protobuf",
Expand All @@ -79,3 +100,18 @@ envoy_cc_library(
"//source/extensions/access_loggers/grpc:http_grpc_access_log_lib",
],
)

envoy_cc_library(
name = "tcp_config",
srcs = ["tcp_config.cc"],
hdrs = ["tcp_config.h"],
deps = [
":config_utils",
"//include/envoy/server:access_log_config_interface",
"//source/common/common:assert_lib",
"//source/common/protobuf",
"//source/extensions/access_loggers:well_known_names",
"//source/extensions/access_loggers/grpc:grpc_access_log_proto_descriptors_lib",
"//source/extensions/access_loggers/grpc:tcp_grpc_access_log_lib",
],
)
25 changes: 25 additions & 0 deletions source/extensions/access_loggers/grpc/config_utils.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#include "extensions/access_loggers/grpc/config_utils.h"

#include "envoy/singleton/manager.h"

namespace Envoy {
namespace Extensions {
namespace AccessLoggers {
namespace GrpcCommon {

// Singleton registration via macro defined in envoy/singleton/manager.h
SINGLETON_MANAGER_REGISTRATION(grpc_access_logger_cache);

std::shared_ptr<GrpcCommon::GrpcAccessLoggerCache>
getGrpcAccessLoggerCacheSingleton(Server::Configuration::FactoryContext& context) {
return context.singletonManager().getTyped<GrpcCommon::GrpcAccessLoggerCache>(
SINGLETON_MANAGER_REGISTERED_NAME(grpc_access_logger_cache), [&context] {
return std::make_shared<GrpcCommon::GrpcAccessLoggerCacheImpl>(
context.clusterManager().grpcAsyncClientManager(), context.scope(),
context.threadLocal(), context.localInfo());
});
}
} // namespace GrpcCommon
} // namespace AccessLoggers
} // namespace Extensions
} // namespace Envoy
18 changes: 18 additions & 0 deletions source/extensions/access_loggers/grpc/config_utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#pragma once

#include "envoy/server/filter_config.h"

#include "extensions/access_loggers/grpc/grpc_access_log_impl.h"

namespace Envoy {
namespace Extensions {
namespace AccessLoggers {
namespace GrpcCommon {

GrpcAccessLoggerCacheSharedPtr
getGrpcAccessLoggerCacheSingleton(Server::Configuration::FactoryContext& context);

} // namespace GrpcCommon
} // namespace AccessLoggers
} // namespace Extensions
} // namespace Envoy
18 changes: 13 additions & 5 deletions source/extensions/access_loggers/grpc/grpc_access_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,22 @@ GrpcAccessLoggerImpl::GrpcAccessLoggerImpl(Grpc::RawAsyncClientPtr&& client, std

void GrpcAccessLoggerImpl::log(envoy::data::accesslog::v2::HTTPAccessLogEntry&& entry) {
approximate_message_size_bytes_ += entry.ByteSizeLong();
message_.mutable_http_logs()->add_log_entry()->Swap(&entry);
message_.mutable_http_logs()->mutable_log_entry()->Add(std::move(entry));
if (approximate_message_size_bytes_ >= buffer_size_bytes_) {
flush();
}
}

void GrpcAccessLoggerImpl::log(envoy::data::accesslog::v2::TCPAccessLogEntry&& entry) {
approximate_message_size_bytes_ += entry.ByteSizeLong();
message_.mutable_tcp_logs()->mutable_log_entry()->Add(std::move(entry));
if (approximate_message_size_bytes_ >= buffer_size_bytes_) {
flush();
}
}

void GrpcAccessLoggerImpl::flush() {
if (!message_.has_http_logs()) {
if (!message_.has_http_logs() && !message_.has_tcp_logs()) {
// Nothing to flush.
return;
}
Expand Down Expand Up @@ -88,11 +96,11 @@ GrpcAccessLoggerCacheImpl::GrpcAccessLoggerCacheImpl(Grpc::AsyncClientManager& a
}

GrpcAccessLoggerSharedPtr GrpcAccessLoggerCacheImpl::getOrCreateLogger(
const envoy::config::accesslog::v2::CommonGrpcAccessLogConfig& config) {
const envoy::config::accesslog::v2::CommonGrpcAccessLogConfig& config,
GrpcAccessLoggerType logger_type) {
// TODO(euroelessar): Consider cleaning up loggers.
auto& cache = tls_slot_->getTyped<ThreadLocalCache>();
// TODO(lizan): Include logger type in the hash
const std::size_t cache_key = MessageUtil::hash(config);
const auto cache_key = std::make_pair(MessageUtil::hash(config), logger_type);
const auto it = cache.access_loggers_.find(cache_key);
if (it != cache.access_loggers_.end()) {
return it->second;
Expand Down
23 changes: 18 additions & 5 deletions source/extensions/access_loggers/grpc/grpc_access_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,18 @@ class GrpcAccessLogger {
* @param entry supplies the access log to send.
*/
virtual void log(envoy::data::accesslog::v2::HTTPAccessLogEntry&& entry) PURE;

/**
* Log tcp access entry.
* @param entry supplies the access log to send.
*/
virtual void log(envoy::data::accesslog::v2::TCPAccessLogEntry&& entry) PURE;
};

using GrpcAccessLoggerSharedPtr = std::shared_ptr<GrpcAccessLogger>;

enum class GrpcAccessLoggerType { TCP, HTTP };

/**
* Interface for an access logger cache. The cache deals with threading and de-duplicates loggers
* for the same configuration.
Expand All @@ -54,7 +62,8 @@ class GrpcAccessLoggerCache {
* @return GrpcAccessLoggerSharedPtr ready for logging requests.
*/
virtual GrpcAccessLoggerSharedPtr
getOrCreateLogger(const ::envoy::config::accesslog::v2::CommonGrpcAccessLogConfig& config) PURE;
getOrCreateLogger(const ::envoy::config::accesslog::v2::CommonGrpcAccessLogConfig& config,
GrpcAccessLoggerType logger_type) PURE;
};

using GrpcAccessLoggerCacheSharedPtr = std::shared_ptr<GrpcAccessLoggerCache>;
Expand All @@ -66,7 +75,9 @@ class GrpcAccessLoggerImpl : public GrpcAccessLogger {
uint64_t buffer_size_bytes, Event::Dispatcher& dispatcher,
const LocalInfo::LocalInfo& local_info);

// Extensions::AccessLoggers::GrpcCommon::GrpcAccessLogger
void log(envoy::data::accesslog::v2::HTTPAccessLogEntry&& entry) override;
void log(envoy::data::accesslog::v2::TCPAccessLogEntry&& entry) override;

private:
struct LocalStream
Expand Down Expand Up @@ -106,8 +117,9 @@ class GrpcAccessLoggerCacheImpl : public Singleton::Instance, public GrpcAccessL
ThreadLocal::SlotAllocator& tls,
const LocalInfo::LocalInfo& local_info);

GrpcAccessLoggerSharedPtr getOrCreateLogger(
const ::envoy::config::accesslog::v2::CommonGrpcAccessLogConfig& config) override;
GrpcAccessLoggerSharedPtr
getOrCreateLogger(const ::envoy::config::accesslog::v2::CommonGrpcAccessLogConfig& config,
GrpcAccessLoggerType logger_type) override;

private:
/**
Expand All @@ -117,8 +129,9 @@ class GrpcAccessLoggerCacheImpl : public Singleton::Instance, public GrpcAccessL
ThreadLocalCache(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {}

Event::Dispatcher& dispatcher_;
// Access loggers indexed by the hash of logger's configuration.
absl::flat_hash_map<std::size_t, GrpcAccessLoggerSharedPtr> access_loggers_;
// Access loggers indexed by the hash of logger's configuration and logger type.
absl::flat_hash_map<std::pair<std::size_t, GrpcAccessLoggerType>, GrpcAccessLoggerSharedPtr>
access_loggers_;
};

Grpc::AsyncClientManager& async_client_manager_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@
namespace Envoy {
namespace Extensions {
namespace AccessLoggers {
namespace HttpGrpc {
namespace GrpcCommon {

void validateProtoDescriptors() {
const auto method = "envoy.service.accesslog.v2.AccessLogService.StreamAccessLogs";

RELEASE_ASSERT(Protobuf::DescriptorPool::generated_pool()->FindMethodByName(method) != nullptr,
"");
};
} // namespace HttpGrpc
} // namespace GrpcCommon
} // namespace AccessLoggers
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
namespace Envoy {
namespace Extensions {
namespace AccessLoggers {
namespace HttpGrpc {
namespace GrpcCommon {

// This function validates that the method descriptors for gRPC services and type descriptors that
// are referenced in Any messages are available in the descriptor pool.
void validateProtoDescriptors();
} // namespace HttpGrpc
} // namespace GrpcCommon
} // namespace AccessLoggers
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ void Utility::responseFlagsToAccessLogResponseFlags(
void Utility::extractCommonAccessLogProperties(
envoy::data::accesslog::v2::AccessLogCommon& common_access_log,
const StreamInfo::StreamInfo& stream_info) {
// TODO(mattklein123): Populate sample_rate field.
if (stream_info.downstreamRemoteAddress() != nullptr) {
Network::Utility::addressToProtobufAddress(
*stream_info.downstreamRemoteAddress(),
Expand Down Expand Up @@ -229,4 +230,4 @@ void Utility::extractCommonAccessLogProperties(
} // namespace GrpcCommon
} // namespace AccessLoggers
} // namespace Extensions
} // namespace Envoy
} // namespace Envoy
22 changes: 7 additions & 15 deletions source/extensions/access_loggers/grpc/http_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "common/grpc/async_client_impl.h"
#include "common/protobuf/protobuf.h"

#include "extensions/access_loggers/grpc/config_utils.h"
#include "extensions/access_loggers/grpc/grpc_access_log_proto_descriptors.h"
#include "extensions/access_loggers/grpc/http_grpc_access_log_impl.h"
#include "extensions/access_loggers/well_known_names.h"
Expand All @@ -19,32 +20,23 @@ namespace Extensions {
namespace AccessLoggers {
namespace HttpGrpc {

// Singleton registration via macro defined in envoy/singleton/manager.h
SINGLETON_MANAGER_REGISTRATION(grpc_access_logger_cache);

AccessLog::InstanceSharedPtr
HttpGrpcAccessLogFactory::createAccessLogInstance(const Protobuf::Message& config,
AccessLog::FilterPtr&& filter,
Server::Configuration::FactoryContext& context) {
validateProtoDescriptors();
GrpcCommon::validateProtoDescriptors();

const auto& proto_config = MessageUtil::downcastAndValidate<
const envoy::config::accesslog::v2::HttpGrpcAccessLogConfig&>(
config, context.messageValidationVisitor());
std::shared_ptr<GrpcCommon::GrpcAccessLoggerCache> grpc_access_logger_cache =
context.singletonManager().getTyped<GrpcCommon::GrpcAccessLoggerCache>(
SINGLETON_MANAGER_REGISTERED_NAME(grpc_access_logger_cache), [&context] {
return std::make_shared<GrpcCommon::GrpcAccessLoggerCacheImpl>(
context.clusterManager().grpcAsyncClientManager(), context.scope(),
context.threadLocal(), context.localInfo());
});

return std::make_shared<HttpGrpcAccessLog>(std::move(filter), proto_config, context.threadLocal(),
grpc_access_logger_cache);

return std::make_shared<HttpGrpcAccessLog>(
std::move(filter), proto_config, context.threadLocal(),
GrpcCommon::getGrpcAccessLoggerCacheSingleton(context));
}

ProtobufTypes::MessagePtr HttpGrpcAccessLogFactory::createEmptyConfigProto() {
return ProtobufTypes::MessagePtr{new envoy::config::accesslog::v2::HttpGrpcAccessLogConfig()};
return std::make_unique<envoy::config::accesslog::v2::HttpGrpcAccessLogConfig>();
}

std::string HttpGrpcAccessLogFactory::name() const { return AccessLogNames::get().HttpGrpc; }
Expand Down
Loading

0 comments on commit 8bdebbf

Please sign in to comment.