Skip to content

Commit

Permalink
grpc: utilities for inter-converting grpc::ByteBuffer and Buffer::Ins…
Browse files Browse the repository at this point in the history
…tance. (#6732)

Add utilities for converting between grpc::ByteBuffer and Buffer::Instance and
for adding gRPC frame headers to Buffer::Instances. This is in support for proving raw
(Buffer::Instance) gRPC support which is in support of #4272. This PR also renames Grpc::Common::serializeBody
to Grpc::Common::serializeToGrpcFrame which is a better description of the actual behavior
as per comments on #6525,

Risk Level: low
Testing: Complete unit tests provided.

Signed-off-by: John Plevyak <jplevyak@gmail.com>
  • Loading branch information
jplevyak authored and htuch committed May 10, 2019
1 parent 4c80194 commit d18b461
Show file tree
Hide file tree
Showing 17 changed files with 334 additions and 19 deletions.
21 changes: 21 additions & 0 deletions source/common/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ envoy_cc_library(
hdrs = ["async_client_manager_impl.h"],
deps = [
":async_client_lib",
":common_lib",
"//include/envoy/grpc:async_client_manager_interface",
"//include/envoy/singleton:manager_interface",
"//include/envoy/thread_local:thread_local_interface",
Expand Down Expand Up @@ -82,6 +83,25 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "google_grpc_utils_lib",
srcs = ["google_grpc_utils.cc"],
hdrs = ["google_grpc_utils.h"],
external_deps = [
"abseil_optional",
"grpc",
],
deps = [
"//source/common/buffer:buffer_lib",
"//source/common/common:assert_lib",
"//source/common/common:empty_string",
"//source/common/common:enum_to_int",
"//source/common/common:macros",
"//source/common/common:utility_lib",
"//source/common/grpc:status_lib",
],
)

envoy_cc_library(
name = "google_async_client_lib",
srcs = ["google_async_client_impl.cc"],
Expand All @@ -91,6 +111,7 @@ envoy_cc_library(
"grpc",
],
deps = [
":common_lib",
":google_grpc_creds_lib",
"//include/envoy/api:api_interface",
"//include/envoy/grpc:google_grpc_creds_interface",
Expand Down
2 changes: 1 addition & 1 deletion source/common/grpc/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ void AsyncStreamImpl::onReset() {
}

void AsyncStreamImpl::sendMessage(const Protobuf::Message& request, bool end_stream) {
stream_->sendData(*Common::serializeBody(request), end_stream);
stream_->sendData(*Common::serializeToGrpcFrame(request), end_stream);
}

void AsyncStreamImpl::closeStream() {
Expand Down
29 changes: 28 additions & 1 deletion source/common/grpc/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <arpa/inet.h>

#include <atomic>
#include <cstdint>
#include <cstring>
#include <string>
Expand All @@ -12,6 +13,7 @@
#include "common/common/enum_to_int.h"
#include "common/common/fmt.h"
#include "common/common/macros.h"
#include "common/common/stack_array.h"
#include "common/common/utility.h"
#include "common/http/headers.h"
#include "common/http/message_impl.h"
Expand Down Expand Up @@ -112,9 +114,11 @@ bool Common::resolveServiceAndMethod(const Http::HeaderEntry* path, std::string*
return true;
}

Buffer::InstancePtr Common::serializeBody(const Protobuf::Message& message) {
Buffer::InstancePtr Common::serializeToGrpcFrame(const Protobuf::Message& message) {
// http://www.grpc.io/docs/guides/wire.html
// Reserve enough space for the entire message and the 5 byte header.
// NB: we do not use prependGrpcFrameHeader because that would add another BufferFragment and this
// (using a single BufferFragment) is more efficient.
Buffer::InstancePtr body(new Buffer::OwnedImpl());
const uint32_t size = message.ByteSize();
const uint32_t alloc_size = size + 5;
Expand All @@ -134,6 +138,21 @@ Buffer::InstancePtr Common::serializeBody(const Protobuf::Message& message) {
return body;
}

Buffer::InstancePtr Common::serializeMessage(const Protobuf::Message& message) {
auto body = std::make_unique<Buffer::OwnedImpl>();
const uint32_t size = message.ByteSize();
Buffer::RawSlice iovec;
body->reserve(size, &iovec, 1);
ASSERT(iovec.len_ >= size);
iovec.len_ = size;
uint8_t* current = reinterpret_cast<uint8_t*>(iovec.mem_);
Protobuf::io::ArrayOutputStream stream(current, size, -1);
Protobuf::io::CodedOutputStream codec_stream(&stream);
message.SerializeWithCachedSizes(&codec_stream);
body->commit(&iovec, 1);
return body;
}

std::chrono::milliseconds Common::getGrpcTimeout(Http::HeaderMap& request_headers) {
std::chrono::milliseconds timeout(0);
Http::HeaderEntry* header_grpc_timeout_entry = request_headers.GrpcTimeout();
Expand Down Expand Up @@ -267,5 +286,13 @@ std::string Common::typeUrl(const std::string& qualified_name) {
return typeUrlPrefix() + "/" + qualified_name;
}

void Common::prependGrpcFrameHeader(Buffer::Instance& buffer) {
std::array<char, 5> header;
header[0] = 0; // flags
const uint32_t nsize = htonl(buffer.length());
std::memcpy(&header[1], reinterpret_cast<const void*>(&nsize), sizeof(uint32_t));
buffer.prepend(absl::string_view(&header[0], 5));
}

} // namespace Grpc
} // namespace Envoy
15 changes: 13 additions & 2 deletions source/common/grpc/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,14 @@ class Common {
std::string* method);

/**
* Serialize protobuf message.
* Serialize protobuf message with gRPC frame header.
*/
static Buffer::InstancePtr serializeBody(const Protobuf::Message& message);
static Buffer::InstancePtr serializeToGrpcFrame(const Protobuf::Message& message);

/**
* Serialize protobuf message. Without grpc header.
*/
static Buffer::InstancePtr serializeMessage(const Protobuf::Message& message);

/**
* Prepare headers for protobuf service.
Expand All @@ -148,6 +153,12 @@ class Common {
*/
static std::string typeUrl(const std::string& qualified_name);

/**
* Prepend a gRPC frame header to a Buffer::Instance containing a single gRPC frame.
* @param buffer containing the frame data which will be modified.
*/
static void prependGrpcFrameHeader(Buffer::Instance& buffer);

private:
static void checkForHeaderOnlyError(Http::Message& http_response);
};
Expand Down
108 changes: 108 additions & 0 deletions source/common/grpc/google_grpc_utils.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#include "common/grpc/google_grpc_utils.h"

#include <atomic>
#include <cstdint>
#include <cstring>
#include <string>

#include "common/buffer/buffer_impl.h"
#include "common/common/assert.h"
#include "common/common/empty_string.h"
#include "common/common/enum_to_int.h"
#include "common/common/fmt.h"
#include "common/common/macros.h"
#include "common/common/stack_array.h"
#include "common/common/utility.h"

#include "absl/strings/match.h"

namespace Envoy {
namespace Grpc {

struct BufferInstanceContainer {
BufferInstanceContainer(int ref_count, Buffer::InstancePtr&& buffer)
: ref_count_(ref_count), buffer_(std::move(buffer)) {}
std::atomic<uint32_t> ref_count_; // In case gPRC dereferences in a different threads.
Buffer::InstancePtr buffer_;

static void derefBufferInstanceContainer(void* container_ptr) {
auto container = static_cast<BufferInstanceContainer*>(container_ptr);
container->ref_count_--;
// This is safe because the ref_count_ is never incremented.
if (container->ref_count_ <= 0) {
delete container;
}
}
};

grpc::ByteBuffer GoogleGrpcUtils::makeByteBuffer(Buffer::InstancePtr&& buffer_instance) {
if (!buffer_instance) {
return {};
}
Buffer::RawSlice on_raw_slice;
// NB: we need to pass in >= 1 in order to get the real "n" (see Buffer::Instance for details).
const int n_slices = buffer_instance->getRawSlices(&on_raw_slice, 1);
if (n_slices <= 0) {
return {};
}
auto* container = new BufferInstanceContainer{n_slices, std::move(buffer_instance)};
if (n_slices == 1) {
grpc::Slice one_slice(on_raw_slice.mem_, on_raw_slice.len_,
&BufferInstanceContainer::derefBufferInstanceContainer, container);
return {&one_slice, 1};
}
STACK_ARRAY(many_raw_slices, Buffer::RawSlice, n_slices);
container->buffer_->getRawSlices(many_raw_slices.begin(), n_slices);
std::vector<grpc::Slice> slices;
slices.reserve(n_slices);
for (int i = 0; i < n_slices; i++) {
slices.emplace_back(many_raw_slices[i].mem_, many_raw_slices[i].len_,
&BufferInstanceContainer::derefBufferInstanceContainer, container);
}
return {&slices[0], slices.size()};
}

struct ByteBufferContainer {
ByteBufferContainer(int ref_count) : ref_count_(ref_count) {}
~ByteBufferContainer() { ::free(fragments_); }
uint32_t ref_count_;
Buffer::BufferFragmentImpl* fragments_ = nullptr;
std::vector<grpc::Slice> slices_;
};

Buffer::InstancePtr GoogleGrpcUtils::makeBufferInstance(const grpc::ByteBuffer& byte_buffer) {
auto buffer = std::make_unique<Buffer::OwnedImpl>();
if (byte_buffer.Length() == 0) {
return buffer;
}
// NB: ByteBuffer::Dump moves the data out of the ByteBuffer so we need to ensure that the
// lifetime of the Slice(s) exceeds our Buffer::Instance.
std::vector<grpc::Slice> slices;
byte_buffer.Dump(&slices);
auto* container = new ByteBufferContainer(static_cast<int>(slices.size()));
std::function<void(const void*, size_t, const Buffer::BufferFragmentImpl*)> releaser =
[container](const void*, size_t, const Buffer::BufferFragmentImpl*) {
container->ref_count_--;
if (container->ref_count_ <= 0) {
delete container;
}
};
// NB: addBufferFragment takes a pointer alias to the BufferFragmentImpl which is passed in so we
// need to ensure that the lifetime of those objects exceeds that of the Buffer::Instance.
RELEASE_ASSERT(!::posix_memalign(reinterpret_cast<void**>(&container->fragments_),
alignof(Buffer::BufferFragmentImpl),
sizeof(Buffer::BufferFragmentImpl) * slices.size()),
"posix_memalign failure");
for (size_t i = 0; i < slices.size(); i++) {
new (&container->fragments_[i])
Buffer::BufferFragmentImpl(slices[i].begin(), slices[i].size(), releaser);
}
for (size_t i = 0; i < slices.size(); i++) {
buffer->addBufferFragment(container->fragments_[i]);
}
container->slices_ = std::move(slices);
return buffer;
}

} // namespace Grpc
} // namespace Envoy
33 changes: 33 additions & 0 deletions source/common/grpc/google_grpc_utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#pragma once

#include <cstdint>
#include <string>

#include "envoy/buffer/buffer.h"

#include "grpcpp/grpcpp.h"

namespace Envoy {
namespace Grpc {

class GoogleGrpcUtils {
public:
/**
* Build grpc::ByteBuffer which aliases the data in a Buffer::InstancePtr.
* @param buffer source data container.
* @return byteBuffer target container aliased to the data in Buffer::Instance and owning the
* Buffer::Instance.
*/
static grpc::ByteBuffer makeByteBuffer(Buffer::InstancePtr&& buffer);

/**
* Build Buffer::Instance which aliases the data in a grpc::ByteBuffer.
* @param buffer source data container.
* @return a Buffer::InstancePtr aliased to the data in the provided grpc::ByteBuffer and
* owning the corresponding grpc::Slice(s).
*/
static Buffer::InstancePtr makeBufferInstance(const grpc::ByteBuffer& buffer);
};

} // namespace Grpc
} // namespace Envoy
2 changes: 1 addition & 1 deletion source/common/upstream/health_checker_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onInterval() {
request.set_service(parent_.service_name_.value());
}

request_encoder_->encodeData(*Grpc::Common::serializeBody(request), true);
request_encoder_->encodeData(*Grpc::Common::serializeToGrpcFrame(request), true);
}

void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onResetStream(Http::StreamResetReason,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void LightStepDriver::LightStepTransporter::Send(const Protobuf::Message& reques
Http::MessagePtr message = Grpc::Common::prepareHeaders(
driver_.cluster()->name(), lightstep::CollectorServiceFullName(),
lightstep::CollectorMethodName(), absl::optional<std::chrono::milliseconds>(timeout));
message->body() = Grpc::Common::serializeBody(request);
message->body() = Grpc::Common::serializeToGrpcFrame(request);

active_request_ =
driver_.clusterManager()
Expand Down
12 changes: 12 additions & 0 deletions test/common/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ envoy_cc_test(
],
)

envoy_cc_test(
name = "google_grpc_utils_test",
srcs = envoy_select_google_grpc(["google_grpc_utils_test.cc"]),
deps = [
"//source/common/grpc:common_lib",
"//source/common/http:headers_lib",
"//test/mocks/upstream:upstream_mocks",
"//test/proto:helloworld_proto_cc",
"//test/test_common:utility_lib",
] + envoy_select_google_grpc(["//source/common/grpc:google_grpc_utils_lib"]),
)

envoy_cc_test(
name = "google_async_client_impl_test",
srcs = envoy_select_google_grpc(["google_async_client_impl_test.cc"]),
Expand Down
15 changes: 15 additions & 0 deletions test/common/grpc/common_test.cc
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include <arpa/inet.h>

#include "common/grpc/common.h"
#include "common/http/headers.h"
#include "common/http/message_impl.h"
Expand Down Expand Up @@ -349,5 +351,18 @@ TEST(GrpcCommonTest, ValidateResponse) {
}
}

// Ensure that the correct gPRC header is constructed for a Buffer::Instance.
TEST(GrpcCommonTest, PrependGrpcFrameHeader) {
auto buffer = std::make_unique<Buffer::OwnedImpl>();
buffer->add("test", 4);
std::array<char, 5> expected_header;
expected_header[0] = 0; // flags
const uint32_t nsize = htonl(4);
std::memcpy(&expected_header[1], reinterpret_cast<const void*>(&nsize), sizeof(uint32_t));
std::string header_string(&expected_header[0], 5);
Common::prependGrpcFrameHeader(*buffer);
EXPECT_EQ(buffer->toString(), header_string + "test");
}

} // namespace Grpc
} // namespace Envoy
Loading

0 comments on commit d18b461

Please sign in to comment.