Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tap: use move semantics for submitTrace #6709

Merged
merged 2 commits into from
Apr 26, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions source/extensions/common/tap/admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,13 @@ void AdminHandler::unregisterConfig(ExtensionConfig& config) {
}

void AdminHandler::AdminPerTapSinkHandle::submitTrace(
const TraceWrapperSharedPtr& trace, envoy::service::tap::v2alpha::OutputSink::Format format) {
TraceWrapperPtr&& trace, envoy::service::tap::v2alpha::OutputSink::Format format) {
ENVOY_LOG(debug, "admin submitting buffered trace to main thread");
// Convert to a shared_ptr, so we can send it to the main thread.
std::shared_ptr<envoy::data::tap::v2alpha::TraceWrapper> shared_trace{std::move(trace)};
// The handle can be destroyed before the cross thread post is complete. Thus, we capture a
// reference to our parent.
parent_.main_thread_dispatcher_.post([& parent = parent_, trace, format]() {
parent_.main_thread_dispatcher_.post([& parent = parent_, trace = shared_trace, format]() {
if (!parent.attached_request_.has_value()) {
return;
}
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/common/tap/admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class AdminHandler : public Singleton::Instance,
AdminPerTapSinkHandle(AdminHandler& parent) : parent_(parent) {}

// Extensions::Common::Tap::PerTapSinkHandle
void submitTrace(const TraceWrapperSharedPtr& trace,
void submitTrace(TraceWrapperPtr&& trace,
envoy::service::tap::v2alpha::OutputSink::Format format) override;

AdminHandler& parent_;
Expand Down
11 changes: 6 additions & 5 deletions source/extensions/common/tap/tap.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ namespace Extensions {
namespace Common {
namespace Tap {

using TraceWrapperSharedPtr = std::shared_ptr<envoy::data::tap::v2alpha::TraceWrapper>;
inline TraceWrapperSharedPtr makeTraceWrapper() {
return std::make_shared<envoy::data::tap::v2alpha::TraceWrapper>();
using TraceWrapper = envoy::data::tap::v2alpha::TraceWrapper;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is this used? Either use it or delete?

using TraceWrapperPtr = std::unique_ptr<envoy::data::tap::v2alpha::TraceWrapper>;
inline TraceWrapperPtr makeTraceWrapper() {
return std::make_unique<envoy::data::tap::v2alpha::TraceWrapper>();
}

/**
Expand All @@ -33,7 +34,7 @@ class PerTapSinkHandle {
* @param trace supplies the trace to send.
* @param format supplies the output format to use.
*/
virtual void submitTrace(const TraceWrapperSharedPtr& trace,
virtual void submitTrace(TraceWrapperPtr&& trace,
envoy::service::tap::v2alpha::OutputSink::Format format) PURE;
};

Expand All @@ -51,7 +52,7 @@ class PerTapSinkHandleManager {
/**
* Submit a buffered or streamed trace segment to all managed per-tap sink handles.
*/
virtual void submitTrace(const TraceWrapperSharedPtr& trace) PURE;
virtual void submitTrace(TraceWrapperPtr&& trace) PURE;
};

using PerTapSinkHandleManagerPtr = std::unique_ptr<PerTapSinkHandleManager>;
Expand Down
7 changes: 3 additions & 4 deletions source/extensions/common/tap/tap_config_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,13 @@ void Utility::bodyBytesToString(envoy::data::tap::v2alpha::TraceWrapper& trace,
}
}

void TapConfigBaseImpl::PerTapSinkHandleManagerImpl::submitTrace(
const TraceWrapperSharedPtr& trace) {
void TapConfigBaseImpl::PerTapSinkHandleManagerImpl::submitTrace(TraceWrapperPtr&& trace) {
Utility::bodyBytesToString(*trace, parent_.sink_format_);
handle_->submitTrace(trace, parent_.sink_format_);
handle_->submitTrace(std::move(trace), parent_.sink_format_);
}

void FilePerTapSink::FilePerTapSinkHandle::submitTrace(
const TraceWrapperSharedPtr& trace, envoy::service::tap::v2alpha::OutputSink::Format format) {
TraceWrapperPtr&& trace, envoy::service::tap::v2alpha::OutputSink::Format format) {
if (!output_file_.is_open()) {
std::string path = fmt::format("{}_{}", parent_.config_.path_prefix(), trace_id_);
switch (format) {
Expand Down
4 changes: 2 additions & 2 deletions source/extensions/common/tap/tap_config_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class TapConfigBaseImpl : public virtual TapConfig {
: parent_(parent), handle_(parent.sink_to_use_->createPerTapSinkHandle(trace_id)) {}

// PerTapSinkHandleManager
void submitTrace(const TraceWrapperSharedPtr& trace) override;
void submitTrace(TraceWrapperPtr&& trace) override;

private:
TapConfigBaseImpl& parent_;
Expand Down Expand Up @@ -129,7 +129,7 @@ class FilePerTapSink : public Sink {
: parent_(parent), trace_id_(trace_id) {}

// PerTapSinkHandle
void submitTrace(const TraceWrapperSharedPtr& trace,
void submitTrace(TraceWrapperPtr&& trace,
envoy::service::tap::v2alpha::OutputSink::Format format) override;

FilePerTapSink& parent_;
Expand Down
134 changes: 57 additions & 77 deletions source/extensions/filters/http/tap/tap_config_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ HttpPerRequestTapperPtr HttpTapConfigImpl::createPerRequestTapper(uint64_t strea
}

void HttpPerRequestTapperImpl::streamRequestHeaders() {
TapCommon::TraceWrapperSharedPtr trace = makeTraceSegment();
TapCommon::TraceWrapperPtr trace = makeTraceSegment();
request_headers_->iterate(
fillHeaderList,
trace->mutable_http_streamed_trace_segment()->mutable_request_headers()->mutable_headers());
sink_handle_->submitTrace(trace);
sink_handle_->submitTrace(std::move(trace));
}

void HttpPerRequestTapperImpl::onRequestHeaders(const Http::HeaderMap& headers) {
Expand All @@ -51,54 +51,24 @@ void HttpPerRequestTapperImpl::onRequestHeaders(const Http::HeaderMap& headers)

void HttpPerRequestTapperImpl::streamBufferedRequestBody() {
if (buffered_streamed_request_body_ != nullptr) {
sink_handle_->submitTrace(buffered_streamed_request_body_);
sink_handle_->submitTrace(std::move(buffered_streamed_request_body_));
buffered_streamed_request_body_.reset();
}
}

void HttpPerRequestTapperImpl::onRequestBody(const Buffer::Instance& data) {
// TODO(mattklein123): Body matching.
if (config_->streaming()) {
const auto match_status = config_->rootMatcher().matchStatus(statuses_);
// Without body matching, we must have already started tracing or have not yet matched.
ASSERT(started_streaming_trace_ || !match_status.matches_);

if (started_streaming_trace_) {
// If we have already started streaming, flush a body segment now.
TapCommon::TraceWrapperSharedPtr trace = makeTraceSegment();
TapCommon::Utility::addBufferToProtoBytes(
*trace->mutable_http_streamed_trace_segment()->mutable_request_body_chunk(),
config_->maxBufferedRxBytes(), data, 0, data.length());
sink_handle_->submitTrace(trace);
} else if (match_status.might_change_status_) {
// If we might still match, start buffering the request body up to our limit.
if (buffered_streamed_request_body_ == nullptr) {
buffered_streamed_request_body_ = makeTraceSegment();
}
auto& body = *buffered_streamed_request_body_->mutable_http_streamed_trace_segment()
->mutable_request_body_chunk();
ASSERT(body.as_bytes().size() <= config_->maxBufferedRxBytes());
TapCommon::Utility::addBufferToProtoBytes(
body, config_->maxBufferedRxBytes() - body.as_bytes().size(), data, 0, data.length());
}
} else {
// If we are not streaming, buffer the request body up to our limit.
makeBufferedFullTraceIfNeeded();
auto& body =
*buffered_full_trace_->mutable_http_buffered_trace()->mutable_request()->mutable_body();
ASSERT(body.as_bytes().size() <= config_->maxBufferedRxBytes());
TapCommon::Utility::addBufferToProtoBytes(
body, config_->maxBufferedRxBytes() - body.as_bytes().size(), data, 0, data.length());
}
onBody(data, buffered_streamed_request_body_, config_->maxBufferedRxBytes(),
&envoy::data::tap::v2alpha::HttpStreamedTraceSegment::mutable_request_body_chunk,
&envoy::data::tap::v2alpha::HttpBufferedTrace::mutable_request);
}

void HttpPerRequestTapperImpl::streamRequestTrailers() {
if (request_trailers_ != nullptr) {
TapCommon::TraceWrapperSharedPtr trace = makeTraceSegment();
TapCommon::TraceWrapperPtr trace = makeTraceSegment();
request_trailers_->iterate(fillHeaderList, trace->mutable_http_streamed_trace_segment()
->mutable_request_trailers()
->mutable_headers());
sink_handle_->submitTrace(trace);
sink_handle_->submitTrace(std::move(trace));
}
}

Expand All @@ -118,11 +88,11 @@ void HttpPerRequestTapperImpl::onRequestTrailers(const Http::HeaderMap& trailers
}

void HttpPerRequestTapperImpl::streamResponseHeaders() {
TapCommon::TraceWrapperSharedPtr trace = makeTraceSegment();
TapCommon::TraceWrapperPtr trace = makeTraceSegment();
response_headers_->iterate(
fillHeaderList,
trace->mutable_http_streamed_trace_segment()->mutable_response_headers()->mutable_headers());
sink_handle_->submitTrace(trace);
sink_handle_->submitTrace(std::move(trace));
}

void HttpPerRequestTapperImpl::onResponseHeaders(const Http::HeaderMap& headers) {
Expand All @@ -143,45 +113,15 @@ void HttpPerRequestTapperImpl::onResponseHeaders(const Http::HeaderMap& headers)

void HttpPerRequestTapperImpl::streamBufferedResponseBody() {
if (buffered_streamed_response_body_ != nullptr) {
sink_handle_->submitTrace(buffered_streamed_response_body_);
sink_handle_->submitTrace(std::move(buffered_streamed_response_body_));
buffered_streamed_response_body_.reset();
}
}

void HttpPerRequestTapperImpl::onResponseBody(const Buffer::Instance& data) {
// TODO(mattklein123): Body matching.
if (config_->streaming()) {
const auto match_status = config_->rootMatcher().matchStatus(statuses_);
// Without body matching, we must have already started tracing or have not yet matched.
ASSERT(started_streaming_trace_ || !match_status.matches_);

if (started_streaming_trace_) {
// If we have already started streaming, flush a body segment now.
TapCommon::TraceWrapperSharedPtr trace = makeTraceSegment();
TapCommon::Utility::addBufferToProtoBytes(
*trace->mutable_http_streamed_trace_segment()->mutable_response_body_chunk(),
config_->maxBufferedTxBytes(), data, 0, data.length());
sink_handle_->submitTrace(trace);
} else if (match_status.might_change_status_) {
// If we might still match, start buffering the request body up to our limit.
if (buffered_streamed_response_body_ == nullptr) {
buffered_streamed_response_body_ = makeTraceSegment();
}
auto& body = *buffered_streamed_response_body_->mutable_http_streamed_trace_segment()
->mutable_response_body_chunk();
ASSERT(body.as_bytes().size() <= config_->maxBufferedTxBytes());
TapCommon::Utility::addBufferToProtoBytes(
body, config_->maxBufferedTxBytes() - body.as_bytes().size(), data, 0, data.length());
}
} else {
// If we are not streaming, buffer the response body up to our limit.
makeBufferedFullTraceIfNeeded();
auto& body =
*buffered_full_trace_->mutable_http_buffered_trace()->mutable_response()->mutable_body();
ASSERT(body.as_bytes().size() <= config_->maxBufferedTxBytes());
TapCommon::Utility::addBufferToProtoBytes(
body, config_->maxBufferedTxBytes() - body.as_bytes().size(), data, 0, data.length());
}
onBody(data, buffered_streamed_response_body_, config_->maxBufferedTxBytes(),
&envoy::data::tap::v2alpha::HttpStreamedTraceSegment::mutable_response_body_chunk,
&envoy::data::tap::v2alpha::HttpBufferedTrace::mutable_response);
}

void HttpPerRequestTapperImpl::onResponseTrailers(const Http::HeaderMap& trailers) {
Expand All @@ -198,11 +138,11 @@ void HttpPerRequestTapperImpl::onResponseTrailers(const Http::HeaderMap& trailer
streamBufferedResponseBody();
}

TapCommon::TraceWrapperSharedPtr trace = makeTraceSegment();
TapCommon::TraceWrapperPtr trace = makeTraceSegment();
trailers.iterate(fillHeaderList, trace->mutable_http_streamed_trace_segment()
->mutable_response_trailers()
->mutable_headers());
sink_handle_->submitTrace(trace);
sink_handle_->submitTrace(std::move(trace));
}
}

Expand All @@ -227,10 +167,50 @@ bool HttpPerRequestTapperImpl::onDestroyLog() {
}

ENVOY_LOG(debug, "submitting buffered trace sink");
sink_handle_->submitTrace(buffered_full_trace_);
// move is safe as onDestroyLog is the last method called.
sink_handle_->submitTrace(std::move(buffered_full_trace_));
return true;
}

void HttpPerRequestTapperImpl::onBody(
const Buffer::Instance& data, Extensions::Common::Tap::TraceWrapperPtr& buffered_streamed_body,
uint32_t maxBufferedBytes, MutableBodyChunk mutable_body_chunk,
MutableMessage mutable_message) {
// TODO(mattklein123): Body matching.
if (config_->streaming()) {
const auto match_status = config_->rootMatcher().matchStatus(statuses_);
// Without body matching, we must have already started tracing or have not yet matched.
ASSERT(started_streaming_trace_ || !match_status.matches_);

if (started_streaming_trace_) {
// If we have already started streaming, flush a body segment now.
TapCommon::TraceWrapperPtr trace = makeTraceSegment();
TapCommon::Utility::addBufferToProtoBytes(
*(trace->mutable_http_streamed_trace_segment()->*mutable_body_chunk)(), maxBufferedBytes,
data, 0, data.length());
sink_handle_->submitTrace(std::move(trace));
} else if (match_status.might_change_status_) {
// If we might still match, start buffering the body up to our limit.
if (buffered_streamed_body == nullptr) {
buffered_streamed_body = makeTraceSegment();
}
auto& body =
*(buffered_streamed_body->mutable_http_streamed_trace_segment()->*mutable_body_chunk)();
ASSERT(body.as_bytes().size() <= maxBufferedBytes);
TapCommon::Utility::addBufferToProtoBytes(body, maxBufferedBytes - body.as_bytes().size(),
data, 0, data.length());
}
} else {
// If we are not streaming, buffer the body up to our limit.
makeBufferedFullTraceIfNeeded();
auto& body =
*(buffered_full_trace_->mutable_http_buffered_trace()->*mutable_message)()->mutable_body();
ASSERT(body.as_bytes().size() <= maxBufferedBytes);
TapCommon::Utility::addBufferToProtoBytes(body, maxBufferedBytes - body.as_bytes().size(), data,
0, data.length());
}
}

} // namespace TapFilter
} // namespace HttpFilters
} // namespace Extensions
Expand Down
21 changes: 15 additions & 6 deletions source/extensions/filters/http/tap/tap_config_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,24 @@ class HttpPerRequestTapperImpl : public HttpPerRequestTapper, Logger::Loggable<L
bool onDestroyLog() override;

private:
typedef envoy::data::tap::v2alpha::Body* (
envoy::data::tap::v2alpha::HttpStreamedTraceSegment::*MutableBodyChunk)();
typedef envoy::data::tap::v2alpha::HttpBufferedTrace::Message* (
envoy::data::tap::v2alpha::HttpBufferedTrace::*MutableMessage)();

void onBody(const Buffer::Instance& data,
Extensions::Common::Tap::TraceWrapperPtr& buffered_streamed_body,
uint32_t maxBufferedBytes, MutableBodyChunk mutable_body_chunk,
MutableMessage mutable_message);

void makeBufferedFullTraceIfNeeded() {
if (buffered_full_trace_ == nullptr) {
buffered_full_trace_ = Extensions::Common::Tap::makeTraceWrapper();
}
}

Extensions::Common::Tap::TraceWrapperSharedPtr makeTraceSegment() {
Extensions::Common::Tap::TraceWrapperSharedPtr segment =
Extensions::Common::Tap::makeTraceWrapper();
Extensions::Common::Tap::TraceWrapperPtr makeTraceSegment() {
Extensions::Common::Tap::TraceWrapperPtr segment = Extensions::Common::Tap::makeTraceWrapper();
segment->mutable_http_streamed_trace_segment()->set_trace_id(stream_id_);
return segment;
}
Expand All @@ -71,9 +80,9 @@ class HttpPerRequestTapperImpl : public HttpPerRequestTapper, Logger::Loggable<L
const Http::HeaderMap* response_headers_{};
const Http::HeaderMap* response_trailers_{};
// Must be a shared_ptr because of submitTrace().
Extensions::Common::Tap::TraceWrapperSharedPtr buffered_streamed_request_body_;
Extensions::Common::Tap::TraceWrapperSharedPtr buffered_streamed_response_body_;
Extensions::Common::Tap::TraceWrapperSharedPtr buffered_full_trace_;
Extensions::Common::Tap::TraceWrapperPtr buffered_streamed_request_body_;
Extensions::Common::Tap::TraceWrapperPtr buffered_streamed_response_body_;
Extensions::Common::Tap::TraceWrapperPtr buffered_full_trace_;
};

} // namespace TapFilter
Expand Down
Loading