-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
tracing: Add support for sending data in Zipkin v2 format #6985
Changes from 39 commits
d445cc8
ac13b27
7fdbeb9
07dfb1d
07c897b
9aafd95
8685f63
4c58100
c6f86c8
c775be0
d60b37f
d03f5f4
a75668b
544522c
7c925bc
53d0ded
12015fe
4c7542f
f7601d0
2a2746a
e4cf785
66ea6dd
240c9a1
8c0480d
ee21280
acfa29d
50aec33
bd462dc
273c92c
756261b
49f6bf5
3d3fcae
336fe07
57625de
f303859
93cbf23
19f3bb4
ba7bf9c
199d28e
10f220d
7971c73
f21f54e
6a39da4
d91e756
b00b470
7c3a278
a1c8cc7
f413994
17e24d8
e602b50
3f4c927
527f62c
c409f91
3976724
ec4ca84
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,29 @@ | ||
#include "extensions/tracers/zipkin/span_buffer.h" | ||
|
||
#include "common/protobuf/protobuf.h" | ||
|
||
#include "extensions/tracers/zipkin/util.h" | ||
#include "extensions/tracers/zipkin/zipkin_core_constants.h" | ||
|
||
namespace Envoy { | ||
namespace Extensions { | ||
namespace Tracers { | ||
namespace Zipkin { | ||
|
||
SpanBuffer::SpanBuffer( | ||
const envoy::config::trace::v2::ZipkinConfig::CollectorEndpointVersion& version, | ||
const bool shared_span_context) | ||
: serializer_{makeSerializer(version, shared_span_context)} {} | ||
|
||
SpanBuffer::SpanBuffer( | ||
const envoy::config::trace::v2::ZipkinConfig::CollectorEndpointVersion& version, | ||
const bool shared_span_context, uint64_t size) | ||
: serializer_{makeSerializer(version, shared_span_context)} { | ||
allocateBuffer(size); | ||
} | ||
|
||
// TODO(fabolive): Need to avoid the copy to improve performance. | ||
dio marked this conversation as resolved.
Show resolved
Hide resolved
|
||
bool SpanBuffer::addSpan(const Span& span) { | ||
bool SpanBuffer::addSpan(Span&& span) { | ||
if (span_buffer_.size() == span_buffer_.capacity()) { | ||
// Buffer full | ||
return false; | ||
|
@@ -16,22 +33,202 @@ bool SpanBuffer::addSpan(const Span& span) { | |
return true; | ||
} | ||
|
||
std::string SpanBuffer::toStringifiedJsonArray() { | ||
SerializerPtr SpanBuffer::makeSerializer( | ||
const envoy::config::trace::v2::ZipkinConfig::CollectorEndpointVersion& version, | ||
const bool shared_span_context) { | ||
switch (version) { | ||
case envoy::config::trace::v2::ZipkinConfig::HTTP_JSON_V1: | ||
return std::make_unique<JsonV1Serializer>(); | ||
case envoy::config::trace::v2::ZipkinConfig::HTTP_JSON: | ||
return std::make_unique<JsonV2Serializer>(shared_span_context); | ||
case envoy::config::trace::v2::ZipkinConfig::HTTP_PROTO: | ||
return std::make_unique<ProtobufSerializer>(shared_span_context); | ||
default: | ||
NOT_REACHED_GCOVR_EXCL_LINE; | ||
} | ||
} | ||
|
||
std::string JsonV1Serializer::serialize(std::vector<Span>&& zipkin_spans) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cc. @basvanbeek There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes I like this better :) |
||
std::string stringified_json_array = "["; | ||
|
||
if (pendingSpans()) { | ||
stringified_json_array += span_buffer_[0].toJson(); | ||
const uint64_t size = span_buffer_.size(); | ||
for (uint64_t i = 1; i < size; i++) { | ||
stringified_json_array += ","; | ||
stringified_json_array += span_buffer_[i].toJson(); | ||
if (!zipkin_spans.empty()) { | ||
absl::StrAppend(&stringified_json_array, zipkin_spans[0].toJson()); | ||
for (uint64_t i = 1; i < zipkin_spans.size(); i++) { | ||
absl::StrAppend(&stringified_json_array, ",", zipkin_spans[i].toJson()); | ||
} | ||
} | ||
stringified_json_array += "]"; | ||
absl::StrAppend(&stringified_json_array, "]"); | ||
|
||
return stringified_json_array; | ||
} | ||
|
||
JsonV2Serializer::JsonV2Serializer(const bool shared_span_context) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ❤️ |
||
: shared_span_context_{shared_span_context} {} | ||
|
||
std::string JsonV2Serializer::serialize(std::vector<Span>&& zipkin_spans) { | ||
std::vector<zipkin::jsonv2::Span> spans; | ||
spans.reserve(zipkin_spans.size() * 2); | ||
dio marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for (const Span& zipkin_span : zipkin_spans) { | ||
const std::vector<zipkin::jsonv2::Span>& converted = toListOfSpans(zipkin_span); | ||
std::copy(converted.begin(), converted.end(), std::back_inserter(spans)); | ||
} | ||
|
||
std::string stringified_json_array = "["; | ||
const uint64_t spans_size = spans.size(); | ||
if (spans_size > 0) { | ||
std::string entry; | ||
Protobuf::util::MessageToJsonString(spans.at(0), &entry); | ||
absl::StrAppend(&stringified_json_array, entry); | ||
for (uint64_t i = 1; i < spans_size; i++) { | ||
entry.clear(); | ||
Protobuf::util::MessageToJsonString(spans.at(i), &entry); | ||
absl::StrAppend(&stringified_json_array, ",", entry); | ||
} | ||
} | ||
absl::StrAppend(&stringified_json_array, "]"); | ||
|
||
return stringified_json_array; | ||
} | ||
|
||
const std::vector<zipkin::jsonv2::Span> | ||
JsonV2Serializer::toListOfSpans(const Span& zipkin_span) const { | ||
std::vector<zipkin::jsonv2::Span> spans; | ||
spans.reserve(zipkin_span.annotations().size()); | ||
for (const auto& annotation : zipkin_span.annotations()) { | ||
zipkin::jsonv2::Span span; | ||
|
||
if (annotation.value() == ZipkinCoreConstants::get().CLIENT_SEND) { | ||
span.set_kind(ZipkinCoreConstants::get().KIND_CLIENT); | ||
} else if (annotation.value() == ZipkinCoreConstants::get().SERVER_RECV) { | ||
span.set_shared(shared_span_context_ && zipkin_span.annotations().size() > 1); | ||
span.set_kind(ZipkinCoreConstants::get().KIND_SERVER); | ||
} else { | ||
continue; | ||
} | ||
|
||
if (annotation.isSetEndpoint()) { | ||
span.set_timestamp(annotation.timestamp()); | ||
span.mutable_local_endpoint()->MergeFrom(toProtoEndpoint(annotation.endpoint())); | ||
} | ||
|
||
span.set_trace_id(zipkin_span.traceIdAsHexString()); | ||
if (zipkin_span.isSetParentId()) { | ||
span.set_parent_id(zipkin_span.parentIdAsHexString()); | ||
} | ||
|
||
span.set_id(zipkin_span.idAsHexString()); | ||
span.set_name(zipkin_span.name()); | ||
|
||
if (zipkin_span.isSetDuration()) { | ||
span.set_duration(zipkin_span.duration()); | ||
} | ||
|
||
auto& tags = *span.mutable_tags(); | ||
for (const auto& binary_annotation : zipkin_span.binaryAnnotations()) { | ||
tags[binary_annotation.key()] = binary_annotation.value(); | ||
} | ||
|
||
spans.push_back(span); | ||
dio marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
return spans; | ||
} | ||
|
||
const zipkin::jsonv2::Endpoint | ||
JsonV2Serializer::toProtoEndpoint(const Endpoint& zipkin_endpoint) const { | ||
zipkin::jsonv2::Endpoint endpoint; | ||
Network::Address::InstanceConstSharedPtr address = zipkin_endpoint.address(); | ||
if (address) { | ||
if (address->ip()->version() == Network::Address::IpVersion::v4) { | ||
endpoint.set_ipv4(address->ip()->addressAsString()); | ||
} else { | ||
endpoint.set_ipv6(address->ip()->addressAsString()); | ||
} | ||
endpoint.set_port(address->ip()->port()); | ||
} | ||
|
||
const std::string& service_name = zipkin_endpoint.serviceName(); | ||
if (!service_name.empty()) { | ||
endpoint.set_service_name(service_name); | ||
} | ||
|
||
return endpoint; | ||
} | ||
|
||
ProtobufSerializer::ProtobufSerializer(const bool shared_span_context) | ||
: shared_span_context_{shared_span_context} {} | ||
|
||
std::string ProtobufSerializer::serialize(std::vector<Span>&& zipkin_spans) { | ||
zipkin::proto3::ListOfSpans spans; | ||
for (const Span& zipkin_span : zipkin_spans) { | ||
spans.MergeFrom(toListOfSpans(zipkin_span)); | ||
} | ||
std::string serialized; | ||
spans.SerializeToString(&serialized); | ||
return serialized; | ||
} | ||
|
||
const zipkin::proto3::ListOfSpans ProtobufSerializer::toListOfSpans(const Span& zipkin_span) const { | ||
zipkin::proto3::ListOfSpans spans; | ||
for (const auto& annotation : zipkin_span.annotations()) { | ||
zipkin::proto3::Span span; | ||
if (annotation.value() == ZipkinCoreConstants::get().CLIENT_SEND) { | ||
span.set_kind(zipkin::proto3::Span::CLIENT); | ||
} else if (annotation.value() == ZipkinCoreConstants::get().SERVER_RECV) { | ||
span.set_shared(shared_span_context_ && zipkin_span.annotations().size() > 1); | ||
span.set_kind(zipkin::proto3::Span::SERVER); | ||
} else { | ||
continue; | ||
} | ||
|
||
if (annotation.isSetEndpoint()) { | ||
span.set_timestamp(annotation.timestamp()); | ||
span.mutable_local_endpoint()->MergeFrom(toProtoEndpoint(annotation.endpoint())); | ||
} | ||
|
||
span.set_trace_id(zipkin_span.traceIdAsByteString()); | ||
if (zipkin_span.isSetParentId()) { | ||
span.set_parent_id(zipkin_span.parentIdAsByteString()); | ||
} | ||
|
||
span.set_id(zipkin_span.idAsByteString()); | ||
span.set_name(zipkin_span.name()); | ||
|
||
if (zipkin_span.isSetDuration()) { | ||
span.set_duration(zipkin_span.duration()); | ||
} | ||
|
||
auto& tags = *span.mutable_tags(); | ||
for (const auto& binary_annotation : zipkin_span.binaryAnnotations()) { | ||
tags[binary_annotation.key()] = binary_annotation.value(); | ||
} | ||
|
||
auto* mutable_span = spans.add_spans(); | ||
mutable_span->MergeFrom(span); | ||
} | ||
return spans; | ||
} | ||
|
||
const zipkin::proto3::Endpoint | ||
ProtobufSerializer::toProtoEndpoint(const Endpoint& zipkin_endpoint) const { | ||
zipkin::proto3::Endpoint endpoint; | ||
Network::Address::InstanceConstSharedPtr address = zipkin_endpoint.address(); | ||
if (address) { | ||
if (address->ip()->version() == Network::Address::IpVersion::v4) { | ||
endpoint.set_ipv4(Util::toByteString(address->ip()->ipv4()->address())); | ||
} else { | ||
endpoint.set_ipv6(Util::toByteString(address->ip()->ipv6()->address())); | ||
} | ||
endpoint.set_port(address->ip()->port()); | ||
} | ||
|
||
const std::string& service_name = zipkin_endpoint.serviceName(); | ||
if (!service_name.empty()) { | ||
endpoint.set_service_name(service_name); | ||
} | ||
|
||
return endpoint; | ||
} | ||
|
||
} // namespace Zipkin | ||
} // namespace Tracers | ||
} // namespace Extensions | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, this will race with #8003. CC @kyessenov
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After #8003 is merged what should we use here?