Skip to content

Commit

Permalink
Lightstep gPRC generation for tracing (#98)
Browse files Browse the repository at this point in the history
  • Loading branch information
RomanDzhabarov authored Oct 3, 2016
1 parent 775f112 commit ce76d30
Show file tree
Hide file tree
Showing 23 changed files with 461 additions and 518 deletions.
1 change: 1 addition & 0 deletions ci/do_ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ $EXTRA_CMAKE_FLAGS -DENVOY_DEBUG:BOOL=OFF \
-DENVOY_TCLAP_INCLUDE_DIR:FILEPATH=/thirdparty/tclap-1.2.1/include \
-DENVOY_JANSSON_INCLUDE_DIR:FILEPATH=/thirdparty_build/include \
-DENVOY_OPENSSL_INCLUDE_DIR:FILEPATH=/thirdparty_build/include \
-DENVOY_LIGHTSTEP_TRACER_INCLUDE_DIR:FILEPATH=/thirdparty_build/include \
-DENVOY_PROTOBUF_INCLUDE_DIR:FILEPATH=/thirdparty_build/include \
-DENVOY_PROTOBUF_PROTOC:FILEPATH=/thirdparty_build/bin/protoc \
-DENVOY_GCOVR:FILEPATH=/thirdparty/gcovr-3.3/scripts/gcovr \
Expand Down
1 change: 1 addition & 0 deletions configs/envoy_double_proxy.template.json
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@
},
{
"name": "lightstep_saas",
"features": "http2",
"ssl_context": {
"ca_cert_file": "/etc/ssl/certs/ca-certificates.crt",
"verify_subject_alt_name": "collector.lightstep.com"
Expand Down
1 change: 1 addition & 0 deletions configs/envoy_front_proxy.template.json
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
},
{
"name": "lightstep_saas",
"features": "http2",
"ssl_context": {
"ca_cert_file": "/etc/ssl/certs/ca-certificates.crt",
"verify_subject_alt_name": "collector.lightstep.com"
Expand Down
1 change: 1 addition & 0 deletions configs/envoy_service_to_service.template.json
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@
},
{
"name": "lightstep_saas",
"features": "http2",
"ssl_context": {
"ca_cert_file": "/etc/ssl/certs/ca-certificates.crt",
"verify_subject_alt_name": "collector.lightstep.com"
Expand Down
1 change: 1 addition & 0 deletions source/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ endif()
include_directories(${ENVOY_LIBEVENT_INCLUDE_DIR})
include_directories(${ENVOY_NGHTTP2_INCLUDE_DIR})
include_directories(SYSTEM ${ENVOY_OPENSSL_INCLUDE_DIR})
include_directories(SYSTEM ${ENVOY_LIGHTSTEP_TRACER_INCLUDE_DIR})

set_target_properties(envoy-common PROPERTIES COTIRE_CXX_PREFIX_HEADER_INIT
"../precompiled/precompiled.h")
Expand Down
5 changes: 5 additions & 0 deletions source/common/common/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,8 @@ bool StringUtil::startsWith(const std::string& source, const std::string& start,
return strncasecmp(source.c_str(), start.c_str(), start.size()) == 0;
}
}

const std::string& StringUtil::valueOrDefault(const std::string& input,
const std::string& default_value) {
return input.empty() ? default_value : input;
}
6 changes: 6 additions & 0 deletions source/common/common/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,10 @@ class StringUtil {
*/
static bool startsWith(const std::string& source, const std::string& start,
bool case_sensitive = true);

/**
* @return original @param input string if it's not empty or @param default_value otherwise.
*/
static const std::string& valueOrDefault(const std::string& input,
const std::string& default_value);
};
74 changes: 74 additions & 0 deletions source/common/grpc/common.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
#include "common.h"

#include "common/buffer/buffer_impl.h"
#include "common/common/enum_to_int.h"
#include "common/common/utility.h"
#include "common/http/headers.h"
#include "common/http/message_impl.h"
#include "common/http/utility.h"

namespace Grpc {

const std::string Common::GRPC_CONTENT_TYPE{"application/grpc"};
Expand All @@ -15,4 +22,71 @@ void Common::chargeStat(Stats::Store& store, const std::string& cluster,
.inc();
}

Buffer::InstancePtr Common::serializeBody(const google::protobuf::Message& message) {
// http://www.grpc.io/docs/guides/wire.html
Buffer::InstancePtr body(new Buffer::OwnedImpl());
uint8_t compressed = 0;
body->add(&compressed, sizeof(compressed));
uint32_t size = htonl(message.ByteSize());
body->add(&size, sizeof(size));
body->add(message.SerializeAsString());

return body;
}

Http::MessagePtr Common::prepareHeaders(const std::string& upstream_cluster,
const std::string& service_full_name,
const std::string& method_name) {
Http::MessagePtr message(new Http::RequestMessageImpl());
message->headers().addViaMoveValue(Http::Headers::get().Scheme, "http");
message->headers().addViaMoveValue(Http::Headers::get().Method, "POST");
message->headers().addViaMoveValue(Http::Headers::get().Path,
fmt::format("/{}/{}", service_full_name, method_name));
message->headers().addViaCopy(Http::Headers::get().Host, upstream_cluster);
message->headers().addViaCopy(Http::Headers::get().ContentType, Common::GRPC_CONTENT_TYPE);

return message;
}

void Common::checkForHeaderOnlyError(Http::Message& http_response) {
// First check for grpc-status in headers. If it is here, we have an error.
const std::string& grpc_status_header = http_response.headers().get(Common::GRPC_STATUS_HEADER);
if (grpc_status_header.empty()) {
return;
}

uint64_t grpc_status_code;
if (!StringUtil::atoul(grpc_status_header.c_str(), grpc_status_code)) {
throw Exception(Optional<uint64_t>(), "bad grpc-status header");
}

const std::string& grpc_status_message = http_response.headers().get(Common::GRPC_MESSAGE_HEADER);
throw Exception(grpc_status_code, grpc_status_message);
}

void Common::validateResponse(Http::Message& http_response) {
if (Http::Utility::getResponseStatus(http_response.headers()) != enumToInt(Http::Code::OK)) {
throw Exception(Optional<uint64_t>(), "non-200 response code");
}

checkForHeaderOnlyError(http_response);

// Check for existence of trailers.
if (!http_response.trailers()) {
throw Exception(Optional<uint64_t>(), "no response trailers");
}

const std::string& grpc_status_header = http_response.trailers()->get(Common::GRPC_STATUS_HEADER);
const std::string& grpc_status_message =
http_response.trailers()->get(Common::GRPC_MESSAGE_HEADER);
uint64_t grpc_status_code;
if (!StringUtil::atoul(grpc_status_header.c_str(), grpc_status_code)) {
throw Exception(Optional<uint64_t>(), "bad grpc-status trailer");
}

if (grpc_status_code != 0) {
throw Exception(grpc_status_code, grpc_status_message);
}
}

} // Grpc
32 changes: 32 additions & 0 deletions source/common/grpc/common.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
#pragma once

#include "envoy/common/exception.h"
#include "envoy/common/optional.h"
#include "envoy/http/header_map.h"
#include "envoy/http/message.h"
#include "envoy/stats/stats.h"

#include "google/protobuf/message.h"

namespace Grpc {

class Exception : public EnvoyException {
public:
Exception(const Optional<uint64_t>& grpc_status, const std::string& message)
: EnvoyException(message), grpc_status_(grpc_status) {}

const Optional<uint64_t> grpc_status_;
};

class Common {
public:
/**
Expand All @@ -18,10 +31,29 @@ class Common {
static void chargeStat(Stats::Store& store, const std::string& cluster,
const std::string& grpc_service, const std::string& grpc_method,
bool success);
/**
* Serialize protobuf message.
*/
static Buffer::InstancePtr serializeBody(const google::protobuf::Message& message);

/**
* Prepare headers for protobuf service.
*/
static Http::MessagePtr prepareHeaders(const std::string& upstream_cluster,
const std::string& service_full_name,
const std::string& method_name);

/**
* Basic validation of gRPC response, @throws Grpc::Exception in case of non successful response.
*/
static void validateResponse(Http::Message& http_response);

static const std::string GRPC_CONTENT_TYPE;
static const Http::LowerCaseString GRPC_MESSAGE_HEADER;
static const Http::LowerCaseString GRPC_STATUS_HEADER;

private:
static void checkForHeaderOnlyError(Http::Message& http_response);
};

} // Grpc
87 changes: 15 additions & 72 deletions source/common/grpc/rpc_channel_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,9 @@ void RpcChannelImpl::CallMethod(const proto::MethodDescriptor* method, proto::Rp
// here for clarity.
ASSERT(cm_.get(cluster_)->features() & Upstream::Cluster::Features::HTTP2);

Http::MessagePtr message(new Http::RequestMessageImpl());
message->headers().addViaMoveValue(Http::Headers::get().Scheme, "http");
message->headers().addViaMoveValue(Http::Headers::get().Method, "POST");
message->headers().addViaMoveValue(
Http::Headers::get().Path,
fmt::format("/{}/{}", method->service()->full_name(), method->name()));
message->headers().addViaCopy(Http::Headers::get().Host, cluster_);
message->headers().addViaCopy(Http::Headers::get().ContentType, Common::GRPC_CONTENT_TYPE);
message->body(serializeBody(*grpc_request));
Http::MessagePtr message =
Common::prepareHeaders(cluster_, method->service()->full_name(), method->name());
message->body(Common::serializeBody(*grpc_request));

callbacks_.onPreRequestCustomizeHeaders(message->headers());
http_request_ = cm_.httpAsyncClientForCluster(cluster_).send(std::move(message), *this, timeout_);
Expand All @@ -49,61 +43,21 @@ void RpcChannelImpl::incStat(bool success) {
grpc_method_->name(), success);
}

void RpcChannelImpl::checkForHeaderOnlyError(Http::Message& http_response) {
// First check for grpc-status in headers. If it is here, we have an error.
const std::string& grpc_status_header = http_response.headers().get(Common::GRPC_STATUS_HEADER);
if (grpc_status_header.empty()) {
return;
}

uint64_t grpc_status_code;
if (!StringUtil::atoul(grpc_status_header.c_str(), grpc_status_code)) {
throw Exception(Optional<uint64_t>(), "bad grpc-status header");
}

const std::string& grpc_status_message = http_response.headers().get(Common::GRPC_MESSAGE_HEADER);
throw Exception(grpc_status_code, grpc_status_message);
}

void RpcChannelImpl::onSuccessWorker(Http::Message& http_response) {
if (Http::Utility::getResponseStatus(http_response.headers()) != enumToInt(Http::Code::OK)) {
throw Exception(Optional<uint64_t>(), "non-200 response code");
}

checkForHeaderOnlyError(http_response);

// Check for existance of trailers.
if (!http_response.trailers()) {
throw Exception(Optional<uint64_t>(), "no response trailers");
}

const std::string& grpc_status_header = http_response.trailers()->get(Common::GRPC_STATUS_HEADER);
const std::string& grpc_status_message =
http_response.trailers()->get(Common::GRPC_MESSAGE_HEADER);
uint64_t grpc_status_code;
if (!StringUtil::atoul(grpc_status_header.c_str(), grpc_status_code)) {
throw Exception(Optional<uint64_t>(), "bad grpc-status trailer");
}
void RpcChannelImpl::onSuccess(Http::MessagePtr&& http_response) {
try {
Common::validateResponse(*http_response);

if (grpc_status_code != 0) {
throw Exception(grpc_status_code, grpc_status_message);
}
// A gRPC response contains a 5 byte header. Currently we only support unary responses so we
// ignore the header. @see serializeBody().
if (!http_response->body() || !(http_response->body()->length() > 5)) {
throw Exception(Optional<uint64_t>(), "bad serialized body");
}

// A GRPC response contains a 5 byte header. Currently we only support unary responses so we
// ignore the header. @see serializeBody().
if (!http_response.body() || !(http_response.body()->length() > 5)) {
throw Exception(Optional<uint64_t>(), "bad serialized body");
}
http_response->body()->drain(5);
if (!grpc_response_->ParseFromString(http_response->bodyAsString())) {
throw Exception(Optional<uint64_t>(), "bad serialized body");
}

http_response.body()->drain(5);
if (!grpc_response_->ParseFromString(http_response.bodyAsString())) {
throw Exception(Optional<uint64_t>(), "bad serialized body");
}
}

void RpcChannelImpl::onSuccess(Http::MessagePtr&& http_response) {
try {
onSuccessWorker(*http_response);
callbacks_.onSuccess();
incStat(true);
onComplete();
Expand Down Expand Up @@ -133,15 +87,4 @@ void RpcChannelImpl::onComplete() {
grpc_response_ = nullptr;
}

Buffer::InstancePtr RpcChannelImpl::serializeBody(const proto::Message& message) {
// http://www.grpc.io/docs/guides/wire.html
Buffer::InstancePtr body(new Buffer::OwnedImpl());
uint8_t compressed = 0;
body->add(&compressed, sizeof(compressed));
uint32_t size = htonl(message.ByteSize());
body->add(&size, sizeof(size));
body->add(message.SerializeAsString());
return body;
}

} // Grpc
9 changes: 0 additions & 9 deletions source/common/grpc/rpc_channel_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,6 @@ class RpcChannelImpl : public RpcChannel, public Http::AsyncClient::Callbacks {
proto::Closure* done_callback) override;

private:
class Exception : public EnvoyException {
public:
Exception(const Optional<uint64_t>& grpc_status, const std::string& message)
: EnvoyException(message), grpc_status_(grpc_status) {}

const Optional<uint64_t> grpc_status_;
};

void checkForHeaderOnlyError(Http::Message& http_response);
void incStat(bool success);
void onComplete();
void onFailureWorker(const Optional<uint64_t>& grpc_status, const std::string& message);
Expand Down
Loading

0 comments on commit ce76d30

Please sign in to comment.