Skip to content

Commit

Permalink
use move semantics for submitTrace
Browse files Browse the repository at this point in the history
Signed-off-by: Yuval Kohavi <yuval.kohavi@gmail.com>
  • Loading branch information
yuval-k committed Apr 25, 2019
1 parent 629bbfb commit 015c5e2
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 111 deletions.
6 changes: 4 additions & 2 deletions source/extensions/common/tap/admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,13 @@ void AdminHandler::unregisterConfig(ExtensionConfig& config) {
}

void AdminHandler::AdminPerTapSinkHandle::submitTrace(
const TraceWrapperSharedPtr& trace, envoy::service::tap::v2alpha::OutputSink::Format format) {
TraceWrapperPtr&& trace, envoy::service::tap::v2alpha::OutputSink::Format format) {
ENVOY_LOG(debug, "admin submitting buffered trace to main thread");
// Convert to a shared_ptr, so we can send it to the main thread.
std::shared_ptr<envoy::data::tap::v2alpha::TraceWrapper> shared_trace{std::move(trace)};
// The handle can be destroyed before the cross thread post is complete. Thus, we capture a
// reference to our parent.
parent_.main_thread_dispatcher_.post([& parent = parent_, trace, format]() {
parent_.main_thread_dispatcher_.post([& parent = parent_, trace = shared_trace, format]() {
if (!parent.attached_request_.has_value()) {
return;
}
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/common/tap/admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class AdminHandler : public Singleton::Instance,
AdminPerTapSinkHandle(AdminHandler& parent) : parent_(parent) {}

// Extensions::Common::Tap::PerTapSinkHandle
void submitTrace(const TraceWrapperSharedPtr& trace,
void submitTrace(TraceWrapperPtr&& trace,
envoy::service::tap::v2alpha::OutputSink::Format format) override;

AdminHandler& parent_;
Expand Down
11 changes: 6 additions & 5 deletions source/extensions/common/tap/tap.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ namespace Extensions {
namespace Common {
namespace Tap {

using TraceWrapperSharedPtr = std::shared_ptr<envoy::data::tap::v2alpha::TraceWrapper>;
inline TraceWrapperSharedPtr makeTraceWrapper() {
return std::make_shared<envoy::data::tap::v2alpha::TraceWrapper>();
using TraceWrapper = envoy::data::tap::v2alpha::TraceWrapper;
using TraceWrapperPtr = std::unique_ptr<envoy::data::tap::v2alpha::TraceWrapper>;
inline TraceWrapperPtr makeTraceWrapper() {
return std::make_unique<envoy::data::tap::v2alpha::TraceWrapper>();
}

/**
Expand All @@ -33,7 +34,7 @@ class PerTapSinkHandle {
* @param trace supplies the trace to send.
* @param format supplies the output format to use.
*/
virtual void submitTrace(const TraceWrapperSharedPtr& trace,
virtual void submitTrace(TraceWrapperPtr&& trace,
envoy::service::tap::v2alpha::OutputSink::Format format) PURE;
};

Expand All @@ -51,7 +52,7 @@ class PerTapSinkHandleManager {
/**
* Submit a buffered or streamed trace segment to all managed per-tap sink handles.
*/
virtual void submitTrace(const TraceWrapperSharedPtr& trace) PURE;
virtual void submitTrace(TraceWrapperPtr&& trace) PURE;
};

using PerTapSinkHandleManagerPtr = std::unique_ptr<PerTapSinkHandleManager>;
Expand Down
7 changes: 3 additions & 4 deletions source/extensions/common/tap/tap_config_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,13 @@ void Utility::bodyBytesToString(envoy::data::tap::v2alpha::TraceWrapper& trace,
}
}

void TapConfigBaseImpl::PerTapSinkHandleManagerImpl::submitTrace(
const TraceWrapperSharedPtr& trace) {
void TapConfigBaseImpl::PerTapSinkHandleManagerImpl::submitTrace(TraceWrapperPtr&& trace) {
Utility::bodyBytesToString(*trace, parent_.sink_format_);
handle_->submitTrace(trace, parent_.sink_format_);
handle_->submitTrace(std::move(trace), parent_.sink_format_);
}

void FilePerTapSink::FilePerTapSinkHandle::submitTrace(
const TraceWrapperSharedPtr& trace, envoy::service::tap::v2alpha::OutputSink::Format format) {
TraceWrapperPtr&& trace, envoy::service::tap::v2alpha::OutputSink::Format format) {
if (!output_file_.is_open()) {
std::string path = fmt::format("{}_{}", parent_.config_.path_prefix(), trace_id_);
switch (format) {
Expand Down
4 changes: 2 additions & 2 deletions source/extensions/common/tap/tap_config_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class TapConfigBaseImpl : public virtual TapConfig {
: parent_(parent), handle_(parent.sink_to_use_->createPerTapSinkHandle(trace_id)) {}

// PerTapSinkHandleManager
void submitTrace(const TraceWrapperSharedPtr& trace) override;
void submitTrace(TraceWrapperPtr&& trace) override;

private:
TapConfigBaseImpl& parent_;
Expand Down Expand Up @@ -129,7 +129,7 @@ class FilePerTapSink : public Sink {
: parent_(parent), trace_id_(trace_id) {}

// PerTapSinkHandle
void submitTrace(const TraceWrapperSharedPtr& trace,
void submitTrace(TraceWrapperPtr&& trace,
envoy::service::tap::v2alpha::OutputSink::Format format) override;

FilePerTapSink& parent_;
Expand Down
134 changes: 57 additions & 77 deletions source/extensions/filters/http/tap/tap_config_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ HttpPerRequestTapperPtr HttpTapConfigImpl::createPerRequestTapper(uint64_t strea
}

void HttpPerRequestTapperImpl::streamRequestHeaders() {
TapCommon::TraceWrapperSharedPtr trace = makeTraceSegment();
TapCommon::TraceWrapperPtr trace = makeTraceSegment();
request_headers_->iterate(
fillHeaderList,
trace->mutable_http_streamed_trace_segment()->mutable_request_headers()->mutable_headers());
sink_handle_->submitTrace(trace);
sink_handle_->submitTrace(std::move(trace));
}

void HttpPerRequestTapperImpl::onRequestHeaders(const Http::HeaderMap& headers) {
Expand All @@ -51,54 +51,24 @@ void HttpPerRequestTapperImpl::onRequestHeaders(const Http::HeaderMap& headers)

void HttpPerRequestTapperImpl::streamBufferedRequestBody() {
if (buffered_streamed_request_body_ != nullptr) {
sink_handle_->submitTrace(buffered_streamed_request_body_);
sink_handle_->submitTrace(std::move(buffered_streamed_request_body_));
buffered_streamed_request_body_.reset();
}
}

void HttpPerRequestTapperImpl::onRequestBody(const Buffer::Instance& data) {
// TODO(mattklein123): Body matching.
if (config_->streaming()) {
const auto match_status = config_->rootMatcher().matchStatus(statuses_);
// Without body matching, we must have already started tracing or have not yet matched.
ASSERT(started_streaming_trace_ || !match_status.matches_);

if (started_streaming_trace_) {
// If we have already started streaming, flush a body segment now.
TapCommon::TraceWrapperSharedPtr trace = makeTraceSegment();
TapCommon::Utility::addBufferToProtoBytes(
*trace->mutable_http_streamed_trace_segment()->mutable_request_body_chunk(),
config_->maxBufferedRxBytes(), data, 0, data.length());
sink_handle_->submitTrace(trace);
} else if (match_status.might_change_status_) {
// If we might still match, start buffering the request body up to our limit.
if (buffered_streamed_request_body_ == nullptr) {
buffered_streamed_request_body_ = makeTraceSegment();
}
auto& body = *buffered_streamed_request_body_->mutable_http_streamed_trace_segment()
->mutable_request_body_chunk();
ASSERT(body.as_bytes().size() <= config_->maxBufferedRxBytes());
TapCommon::Utility::addBufferToProtoBytes(
body, config_->maxBufferedRxBytes() - body.as_bytes().size(), data, 0, data.length());
}
} else {
// If we are not streaming, buffer the request body up to our limit.
makeBufferedFullTraceIfNeeded();
auto& body =
*buffered_full_trace_->mutable_http_buffered_trace()->mutable_request()->mutable_body();
ASSERT(body.as_bytes().size() <= config_->maxBufferedRxBytes());
TapCommon::Utility::addBufferToProtoBytes(
body, config_->maxBufferedRxBytes() - body.as_bytes().size(), data, 0, data.length());
}
onBody(data, buffered_streamed_request_body_, config_->maxBufferedRxBytes(),
&envoy::data::tap::v2alpha::HttpStreamedTraceSegment::mutable_request_body_chunk,
&envoy::data::tap::v2alpha::HttpBufferedTrace::mutable_request);
}

void HttpPerRequestTapperImpl::streamRequestTrailers() {
if (request_trailers_ != nullptr) {
TapCommon::TraceWrapperSharedPtr trace = makeTraceSegment();
TapCommon::TraceWrapperPtr trace = makeTraceSegment();
request_trailers_->iterate(fillHeaderList, trace->mutable_http_streamed_trace_segment()
->mutable_request_trailers()
->mutable_headers());
sink_handle_->submitTrace(trace);
sink_handle_->submitTrace(std::move(trace));
}
}

Expand All @@ -118,11 +88,11 @@ void HttpPerRequestTapperImpl::onRequestTrailers(const Http::HeaderMap& trailers
}

void HttpPerRequestTapperImpl::streamResponseHeaders() {
TapCommon::TraceWrapperSharedPtr trace = makeTraceSegment();
TapCommon::TraceWrapperPtr trace = makeTraceSegment();
response_headers_->iterate(
fillHeaderList,
trace->mutable_http_streamed_trace_segment()->mutable_response_headers()->mutable_headers());
sink_handle_->submitTrace(trace);
sink_handle_->submitTrace(std::move(trace));
}

void HttpPerRequestTapperImpl::onResponseHeaders(const Http::HeaderMap& headers) {
Expand All @@ -143,45 +113,15 @@ void HttpPerRequestTapperImpl::onResponseHeaders(const Http::HeaderMap& headers)

void HttpPerRequestTapperImpl::streamBufferedResponseBody() {
if (buffered_streamed_response_body_ != nullptr) {
sink_handle_->submitTrace(buffered_streamed_response_body_);
sink_handle_->submitTrace(std::move(buffered_streamed_response_body_));
buffered_streamed_response_body_.reset();
}
}

void HttpPerRequestTapperImpl::onResponseBody(const Buffer::Instance& data) {
// TODO(mattklein123): Body matching.
if (config_->streaming()) {
const auto match_status = config_->rootMatcher().matchStatus(statuses_);
// Without body matching, we must have already started tracing or have not yet matched.
ASSERT(started_streaming_trace_ || !match_status.matches_);

if (started_streaming_trace_) {
// If we have already started streaming, flush a body segment now.
TapCommon::TraceWrapperSharedPtr trace = makeTraceSegment();
TapCommon::Utility::addBufferToProtoBytes(
*trace->mutable_http_streamed_trace_segment()->mutable_response_body_chunk(),
config_->maxBufferedTxBytes(), data, 0, data.length());
sink_handle_->submitTrace(trace);
} else if (match_status.might_change_status_) {
// If we might still match, start buffering the request body up to our limit.
if (buffered_streamed_response_body_ == nullptr) {
buffered_streamed_response_body_ = makeTraceSegment();
}
auto& body = *buffered_streamed_response_body_->mutable_http_streamed_trace_segment()
->mutable_response_body_chunk();
ASSERT(body.as_bytes().size() <= config_->maxBufferedTxBytes());
TapCommon::Utility::addBufferToProtoBytes(
body, config_->maxBufferedTxBytes() - body.as_bytes().size(), data, 0, data.length());
}
} else {
// If we are not streaming, buffer the response body up to our limit.
makeBufferedFullTraceIfNeeded();
auto& body =
*buffered_full_trace_->mutable_http_buffered_trace()->mutable_response()->mutable_body();
ASSERT(body.as_bytes().size() <= config_->maxBufferedTxBytes());
TapCommon::Utility::addBufferToProtoBytes(
body, config_->maxBufferedTxBytes() - body.as_bytes().size(), data, 0, data.length());
}
onBody(data, buffered_streamed_response_body_, config_->maxBufferedTxBytes(),
&envoy::data::tap::v2alpha::HttpStreamedTraceSegment::mutable_response_body_chunk,
&envoy::data::tap::v2alpha::HttpBufferedTrace::mutable_response);
}

void HttpPerRequestTapperImpl::onResponseTrailers(const Http::HeaderMap& trailers) {
Expand All @@ -198,11 +138,11 @@ void HttpPerRequestTapperImpl::onResponseTrailers(const Http::HeaderMap& trailer
streamBufferedResponseBody();
}

TapCommon::TraceWrapperSharedPtr trace = makeTraceSegment();
TapCommon::TraceWrapperPtr trace = makeTraceSegment();
trailers.iterate(fillHeaderList, trace->mutable_http_streamed_trace_segment()
->mutable_response_trailers()
->mutable_headers());
sink_handle_->submitTrace(trace);
sink_handle_->submitTrace(std::move(trace));
}
}

Expand All @@ -227,10 +167,50 @@ bool HttpPerRequestTapperImpl::onDestroyLog() {
}

ENVOY_LOG(debug, "submitting buffered trace sink");
sink_handle_->submitTrace(buffered_full_trace_);
// move is safe as onDestroyLog is the last method called.
sink_handle_->submitTrace(std::move(buffered_full_trace_));
return true;
}

void HttpPerRequestTapperImpl::onBody(
const Buffer::Instance& data, Extensions::Common::Tap::TraceWrapperPtr& buffered_streamed_body,
uint32_t maxBufferedBytes, MutableBodyChunk mutable_body_chunk,
MutableMessage mutable_message) {
// TODO(mattklein123): Body matching.
if (config_->streaming()) {
const auto match_status = config_->rootMatcher().matchStatus(statuses_);
// Without body matching, we must have already started tracing or have not yet matched.
ASSERT(started_streaming_trace_ || !match_status.matches_);

if (started_streaming_trace_) {
// If we have already started streaming, flush a body segment now.
TapCommon::TraceWrapperPtr trace = makeTraceSegment();
TapCommon::Utility::addBufferToProtoBytes(
*(trace->mutable_http_streamed_trace_segment()->*mutable_body_chunk)(), maxBufferedBytes,
data, 0, data.length());
sink_handle_->submitTrace(std::move(trace));
} else if (match_status.might_change_status_) {
// If we might still match, start buffering the body up to our limit.
if (buffered_streamed_body == nullptr) {
buffered_streamed_body = makeTraceSegment();
}
auto& body =
*(buffered_streamed_body->mutable_http_streamed_trace_segment()->*mutable_body_chunk)();
ASSERT(body.as_bytes().size() <= maxBufferedBytes);
TapCommon::Utility::addBufferToProtoBytes(body, maxBufferedBytes - body.as_bytes().size(),
data, 0, data.length());
}
} else {
// If we are not streaming, buffer the body up to our limit.
makeBufferedFullTraceIfNeeded();
auto& body =
*(buffered_full_trace_->mutable_http_buffered_trace()->*mutable_message)()->mutable_body();
ASSERT(body.as_bytes().size() <= maxBufferedBytes);
TapCommon::Utility::addBufferToProtoBytes(body, maxBufferedBytes - body.as_bytes().size(), data,
0, data.length());
}
}

} // namespace TapFilter
} // namespace HttpFilters
} // namespace Extensions
Expand Down
21 changes: 15 additions & 6 deletions source/extensions/filters/http/tap/tap_config_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,24 @@ class HttpPerRequestTapperImpl : public HttpPerRequestTapper, Logger::Loggable<L
bool onDestroyLog() override;

private:
typedef envoy::data::tap::v2alpha::Body* (
envoy::data::tap::v2alpha::HttpStreamedTraceSegment::*MutableBodyChunk)();
typedef envoy::data::tap::v2alpha::HttpBufferedTrace::Message* (
envoy::data::tap::v2alpha::HttpBufferedTrace::*MutableMessage)();

void onBody(const Buffer::Instance& data,
Extensions::Common::Tap::TraceWrapperPtr& buffered_streamed_body,
uint32_t maxBufferedBytes, MutableBodyChunk mutable_body_chunk,
MutableMessage mutable_message);

void makeBufferedFullTraceIfNeeded() {
if (buffered_full_trace_ == nullptr) {
buffered_full_trace_ = Extensions::Common::Tap::makeTraceWrapper();
}
}

Extensions::Common::Tap::TraceWrapperSharedPtr makeTraceSegment() {
Extensions::Common::Tap::TraceWrapperSharedPtr segment =
Extensions::Common::Tap::makeTraceWrapper();
Extensions::Common::Tap::TraceWrapperPtr makeTraceSegment() {
Extensions::Common::Tap::TraceWrapperPtr segment = Extensions::Common::Tap::makeTraceWrapper();
segment->mutable_http_streamed_trace_segment()->set_trace_id(stream_id_);
return segment;
}
Expand All @@ -71,9 +80,9 @@ class HttpPerRequestTapperImpl : public HttpPerRequestTapper, Logger::Loggable<L
const Http::HeaderMap* response_headers_{};
const Http::HeaderMap* response_trailers_{};
// Must be a shared_ptr because of submitTrace().
Extensions::Common::Tap::TraceWrapperSharedPtr buffered_streamed_request_body_;
Extensions::Common::Tap::TraceWrapperSharedPtr buffered_streamed_response_body_;
Extensions::Common::Tap::TraceWrapperSharedPtr buffered_full_trace_;
Extensions::Common::Tap::TraceWrapperPtr buffered_streamed_request_body_;
Extensions::Common::Tap::TraceWrapperPtr buffered_streamed_response_body_;
Extensions::Common::Tap::TraceWrapperPtr buffered_full_trace_;
};

} // namespace TapFilter
Expand Down
Loading

0 comments on commit 015c5e2

Please sign in to comment.