Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EXPORTER] Async exporting for otlp grpc #2407

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
3e840b0
Use protobuf arena to reduce memory fragments and improve performence…
owent Nov 14, 2023
9b3b057
Add async exporting for OTLP gRPC exporter
owent Nov 15, 2023
74175a3
Fix OTLP HTTP exporter without async exporting.
owent Nov 15, 2023
3cc2776
Fix condition_variable including
owent Nov 15, 2023
aeb7b55
Fix compile problem without `ENABLE_ASYNC_EXPORT`
owent Nov 16, 2023
25419f6
Use callback API for OTLP gRPC exporter.
owent Nov 16, 2023
e7f119a
Fix unit test for OTLP gRPC async exporting.
owent Nov 16, 2023
4301f23
Fix grpcpp header
owent Nov 16, 2023
1862010
There is no compatible way to detect gRPC version.
owent Nov 16, 2023
5fef32a
Fix style and benchmark
owent Nov 16, 2023
0dc3f73
Fix styles
owent Nov 16, 2023
0951836
Merge branch 'main' into async_exporting_for_otlp_grpc_exporter
owent Nov 19, 2023
a9c538d
Merge remote-tracking branch 'github/main' into async_exporting_for_o…
owent Nov 21, 2023
ae7dc10
Merge branch 'main' into async_exporting_for_otlp_grpc_exporter
owent Nov 28, 2023
57ebf34
Merge branch 'main' into async_exporting_for_otlp_grpc_exporter
owent Nov 30, 2023
a3c78d2
Merge branch 'main' into async_exporting_for_otlp_grpc_exporter
owent Dec 1, 2023
0dadc1c
Fix feedbacks.
owent Dec 1, 2023
ac964f3
Fix style
owent Dec 1, 2023
5c7c352
Merge branch 'main' into async_exporting_for_otlp_grpc_exporter
owent Dec 5, 2023
54fb976
Merge remote-tracking branch 'github/main' into async_exporting_for_o…
owent Dec 8, 2023
692cfed
Fixes format
owent Dec 8, 2023
49c3b7d
Merge branch 'main' into async_exporting_for_otlp_grpc_exporter
owent Dec 11, 2023
be4bcd1
Merge remote-tracking branch 'github/main' into async_exporting_for_o…
owent Dec 15, 2023
3a53ed9
Merge branch 'main' into async_exporting_for_otlp_grpc_exporter
marcalff Dec 18, 2023
7c66023
Merge branch 'main' into async_exporting_for_otlp_grpc_exporter
owent Dec 27, 2023
6587bd3
Merge branch 'main' into async_exporting_for_otlp_grpc_exporter
owent Jan 4, 2024
4c892a7
Merge branch 'main' into async_exporting_for_otlp_grpc_exporter
owent Jan 6, 2024
63489fa
Merge branch 'main' into async_exporting_for_otlp_grpc_exporter
marcalff Jan 8, 2024
05525fd
Cleanup unused methods
owent Jan 10, 2024
d07cdf6
Make constructor and destructors always available.
owent Jan 10, 2024
cdb8816
Fix wair_for in destructor
owent Jan 12, 2024
7dd1bd3
Merge branch 'main' into async_exporting_for_otlp_grpc_exporter
owent Jan 18, 2024
1cc3f1c
Merge branch 'main' into async_exporting_for_otlp_grpc_exporter
ThomsonTan Jan 18, 2024
d0f1add
Merge branch 'main' into async_exporting_for_otlp_grpc_exporter
owent Jan 21, 2024
daccf33
Merge branch 'main' into async_exporting_for_otlp_grpc_exporter
ThomsonTan Jan 24, 2024
d86cb39
Merge branch 'main' into async_exporting_for_otlp_grpc_exporter
esigo Jan 27, 2024
95f7614
Merge remote-tracking branch 'github/main' into async_exporting_for_o…
owent Feb 1, 2024
0f4c77f
Add comments for old version of gRPC, make `is_shutdown_` thread-safety
owent Feb 1, 2024
277b73b
Merge remote-tracking branch 'origin/async_exporting_for_otlp_grpc_ex…
owent Feb 1, 2024
37f7dbd
Merge branch 'main' into async_exporting_for_otlp_grpc_exporter
ThomsonTan Feb 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ Increment the:

## [Unreleased]

* [EXPORTER] Add async exporting for OTLP/GRPC exporter
[#2407](https://github.com/open-telemetry/opentelemetry-cpp/pull/2407)
* [API] Fix b3, w3c and jaeger propagators: they will not overwrite
the active span with a default invalid span, which is especially useful
when used with CompositePropagator
Expand Down
3 changes: 3 additions & 0 deletions api/include/opentelemetry/common/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,16 +164,19 @@ point.
#if defined(__clang__)

# define OPENTELEMETRY_API_SINGLETON __attribute__((visibility("default")))
# define OPENTELEMETRY_LOCAL_SYMBOL __attribute__((visibility("hidden")))

#elif defined(__GNUC__)

# define OPENTELEMETRY_API_SINGLETON __attribute__((visibility("default")))
# define OPENTELEMETRY_LOCAL_SYMBOL __attribute__((visibility("hidden")))

#else

/* Add support for other compilers here. */

# define OPENTELEMETRY_API_SINGLETON
# define OPENTELEMETRY_LOCAL_SYMBOL

#endif

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@

#pragma once

#include <grpcpp/completion_queue.h>
#include <grpcpp/grpcpp.h>

#include <atomic>
#include <memory>

#include "opentelemetry/sdk/common/exporter_utils.h"

#include "opentelemetry/exporters/otlp/otlp_grpc_client_options.h"

#include "opentelemetry/exporters/otlp/protobuf_include_prefix.h"

#include "google/protobuf/arena.h"
#include "opentelemetry/proto/collector/logs/v1/logs_service.grpc.pb.h"
#include "opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h"
#include "opentelemetry/proto/collector/trace/v1/trace_service.grpc.pb.h"
Expand All @@ -25,12 +30,20 @@ namespace otlp

struct OtlpGrpcClientOptions;

#ifdef ENABLE_ASYNC_EXPORT
struct OtlpGrpcClientAsyncData;
#endif

/**
* The OTLP gRPC client contains utility functions of gRPC.
*/
class OtlpGrpcClient
{
public:
OtlpGrpcClient();
marcalff marked this conversation as resolved.
Show resolved Hide resolved

~OtlpGrpcClient();

/**
* Create gRPC channel from the exporter options.
*/
Expand All @@ -42,11 +55,6 @@ class OtlpGrpcClient
static std::unique_ptr<grpc::ClientContext> MakeClientContext(
const OtlpGrpcClientOptions &options);

/**
* Create gRPC CompletionQueue to async call RPC.
*/
static std::unique_ptr<grpc::CompletionQueue> MakeCompletionQueue();

/**
* Create trace service stub to communicate with the OpenTelemetry Collector.
*/
Expand All @@ -67,21 +75,109 @@ class OtlpGrpcClient

static grpc::Status DelegateExport(
proto::collector::trace::v1::TraceService::StubInterface *stub,
grpc::ClientContext *context,
const proto::collector::trace::v1::ExportTraceServiceRequest &request,
std::unique_ptr<grpc::ClientContext> &&context,
std::unique_ptr<google::protobuf::Arena> &&arena,
proto::collector::trace::v1::ExportTraceServiceRequest &&request,
proto::collector::trace::v1::ExportTraceServiceResponse *response);

static grpc::Status DelegateExport(
proto::collector::metrics::v1::MetricsService::StubInterface *stub,
grpc::ClientContext *context,
const proto::collector::metrics::v1::ExportMetricsServiceRequest &request,
std::unique_ptr<grpc::ClientContext> &&context,
std::unique_ptr<google::protobuf::Arena> &&arena,
proto::collector::metrics::v1::ExportMetricsServiceRequest &&request,
proto::collector::metrics::v1::ExportMetricsServiceResponse *response);

static grpc::Status DelegateExport(
proto::collector::logs::v1::LogsService::StubInterface *stub,
grpc::ClientContext *context,
const proto::collector::logs::v1::ExportLogsServiceRequest &request,
std::unique_ptr<grpc::ClientContext> &&context,
std::unique_ptr<google::protobuf::Arena> &&arena,
proto::collector::logs::v1::ExportLogsServiceRequest &&request,
proto::collector::logs::v1::ExportLogsServiceResponse *response);

#ifdef ENABLE_ASYNC_EXPORT

/**
* Async export
* @param options Options used to message to create gRPC context and stub(if necessary)
* @param arena Protobuf arena to hold lifetime of all messages
* @param request Request for this RPC
* @param result_callback callback to call when the exporting is done
* @return return the status of this operation
*/
sdk::common::ExportResult DelegateAsyncExport(
const OtlpGrpcClientOptions &options,
proto::collector::trace::v1::TraceService::StubInterface *stub,
std::unique_ptr<grpc::ClientContext> &&context,
std::unique_ptr<google::protobuf::Arena> &&arena,
proto::collector::trace::v1::ExportTraceServiceRequest &&request,
std::function<bool(opentelemetry::sdk::common::ExportResult,
std::unique_ptr<google::protobuf::Arena> &&,
const proto::collector::trace::v1::ExportTraceServiceRequest &,
proto::collector::trace::v1::ExportTraceServiceResponse *)>
&&result_callback) noexcept;

/**
* Async export
* @param options Options used to message to create gRPC context and stub(if necessary)
* @param arena Protobuf arena to hold lifetime of all messages
* @param request Request for this RPC
* @param result_callback callback to call when the exporting is done
* @return return the status of this operation
*/
sdk::common::ExportResult DelegateAsyncExport(
const OtlpGrpcClientOptions &options,
proto::collector::metrics::v1::MetricsService::StubInterface *stub,
std::unique_ptr<grpc::ClientContext> &&context,
std::unique_ptr<google::protobuf::Arena> &&arena,
proto::collector::metrics::v1::ExportMetricsServiceRequest &&request,
std::function<bool(opentelemetry::sdk::common::ExportResult,
std::unique_ptr<google::protobuf::Arena> &&,
const proto::collector::metrics::v1::ExportMetricsServiceRequest &,
proto::collector::metrics::v1::ExportMetricsServiceResponse *)>
&&result_callback) noexcept;

/**
* Async export
* @param options Options used to message to create gRPC context and stub(if necessary)
* @param arena Protobuf arena to hold lifetime of all messages
* @param request Request for this RPC
* @param result_callback callback to call when the exporting is done
* @return return the status of this operation
*/
sdk::common::ExportResult DelegateAsyncExport(
const OtlpGrpcClientOptions &options,
proto::collector::logs::v1::LogsService::StubInterface *stub,
std::unique_ptr<grpc::ClientContext> &&context,
std::unique_ptr<google::protobuf::Arena> &&arena,
proto::collector::logs::v1::ExportLogsServiceRequest &&request,
std::function<bool(opentelemetry::sdk::common::ExportResult,
std::unique_ptr<google::protobuf::Arena> &&,
const proto::collector::logs::v1::ExportLogsServiceRequest &,
proto::collector::logs::v1::ExportLogsServiceResponse *)>
&&result_callback) noexcept;

/**
* Force flush the gRPC client.
*/
bool ForceFlush(std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept;

/**
* Shut down the gRPC client.
* @param timeout an optional timeout, the default timeout of 0 means that no
* timeout is applied.
* @return return the status of this operation
*/
bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept;

std::shared_ptr<OtlpGrpcClientAsyncData> MutableAsyncData(const OtlpGrpcClientOptions &options);

private:
// Stores if this gRPC client had its Shutdown() method called
std::atomic<bool> is_shutdown_;

// Stores shared data between threads of this gRPC client
std::shared_ptr<OtlpGrpcClientAsyncData> async_data_;
#endif
};
} // namespace otlp
} // namespace exporter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ struct OtlpGrpcClientOptions

/** User agent. */
std::string user_agent;

/** max number of threads that can be allocated from this */
std::size_t max_threads;

#ifdef ENABLE_ASYNC_EXPORT
// Concurrent requests
std::size_t max_concurrent_requests;
#endif
};

} // namespace otlp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ namespace exporter
namespace otlp
{

class OtlpGrpcClient;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be moved to otlp_grpc_exporter_options.h as a central place?

Copy link
Member Author

@owent owent Jan 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users use the factory and otlp_grpc*_options.h to create OTLP gRPC exporters, but I think users do not need to know the internal class OtlpGrpcClient?


/**
* The OTLP exporter exports span data in OpenTelemetry Protocol (OTLP) format.
*/
Expand Down Expand Up @@ -73,6 +75,10 @@ class OtlpGrpcExporter final : public opentelemetry::sdk::trace::SpanExporter
// The configuration options associated with this exporter.
const OtlpGrpcExporterOptions options_;

#ifdef ENABLE_ASYNC_EXPORT
std::shared_ptr<OtlpGrpcClient> client_;
#endif

// For testing
friend class OtlpGrpcExporterTestPeer;
friend class OtlpGrpcLogRecordExporterTestPeer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ namespace exporter
namespace otlp
{

class OtlpGrpcClient;

/**
* The OTLP exporter exports log data in OpenTelemetry Protocol (OTLP) format in gRPC.
*/
Expand Down Expand Up @@ -73,6 +75,10 @@ class OtlpGrpcLogRecordExporter : public opentelemetry::sdk::logs::LogRecordExpo
// Configuration options for the exporter
const OtlpGrpcLogRecordExporterOptions options_;

#ifdef ENABLE_ASYNC_EXPORT
std::shared_ptr<OtlpGrpcClient> client_;
#endif

// For testing
friend class OtlpGrpcLogRecordExporterTestPeer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ namespace exporter
namespace otlp
{

class OtlpGrpcClient;

/**
* The OTLP exporter exports metrics data in OpenTelemetry Protocol (OTLP) format in gRPC.
*/
Expand Down Expand Up @@ -59,6 +61,10 @@ class OtlpGrpcMetricExporter : public opentelemetry::sdk::metrics::PushMetricExp
// The configuration options associated with this exporter.
const OtlpGrpcMetricExporterOptions options_;

#ifdef ENABLE_ASYNC_EXPORT
std::shared_ptr<OtlpGrpcClient> client_;
#endif

// Aggregation Temporality selector
const sdk::metrics::AggregationTemporalitySelector aggregation_temporality_selector_;

Expand Down
Loading