Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: add opentelemetry flight instructions #298

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion cpp/code/.clang-tidy
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@
---
Checks: '*,-llvmlibc*,-cert-err58-cpp,-modernize-use-trailing-return-type,-fuchsia-*,-cppcoreguidelines-*,
-readability-magic-numbers,-clang-analyzer-cplusplus.NewDelete,-clang-analyzer-cplusplus.NewDeleteLeaks,
-readability-function-cognitive-complexity, -hicpp-special-member-functions'
-readability-function-cognitive-complexity, -hicpp-special-member-functions
-readability-function-cognitive-complexity,-readability-named-parameter,
-readability-convert-member-functions-to-static,
-readability-identifier-length,-hicpp-special-member-functions,-hicpp-named-parameter,
-altera-*,-misc-no-recursion,-llvm-include-order,
-misc-non-private-member-variables-in-classes'
WarningsAsErrors: '*'
FormatStyle: 'file'
CheckOptions:
- key: performance-unnecessary-value-param.AllowedTypes
value: 'shared_ptr'
- key: performance-for-range-copy.AllowedTypes
value: 'shared_ptr'
10 changes: 6 additions & 4 deletions cpp/code/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ cmake_minimum_required(VERSION 3.19)
project(arrow-cookbook)

set(CMAKE_CXX_STANDARD 17)
if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libstdc++")
endif()
# if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
# set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libstdc++")
# endif()

# Add Arrow and other required packages
find_package(Arrow REQUIRED)
Expand All @@ -32,9 +32,10 @@ endif()
find_package(ArrowDataset REQUIRED)
find_package(ArrowFlight REQUIRED)
find_package(Parquet REQUIRED)
find_package(opentelemetry-cpp REQUIRED)

if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set(CMAKE_CXX_CLANG_TIDY "clang-tidy")
# set(CMAKE_CXX_CLANG_TIDY "clang-tidy")
endif()

# Create test targets
Expand Down Expand Up @@ -89,6 +90,7 @@ target_link_libraries(flight
protobuf::libprotobuf
gRPC::grpc
gRPC::grpc++
opentelemetry-cpp::ostream_span_exporter
)
target_include_directories(flight PUBLIC ${CMAKE_CURRENT_BINARY_DIR})
get_target_property(grpc_cpp_plugin_location gRPC::grpc_cpp_plugin LOCATION)
Expand Down
108 changes: 100 additions & 8 deletions cpp/code/flight.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
#include <arrow/filesystem/filesystem.h>
#include <arrow/filesystem/localfs.h>
#include <arrow/flight/client.h>
#include <arrow/flight/client_tracing_middleware.h>
#include <arrow/flight/server.h>
#include <arrow/flight/server_tracing_middleware.h>
#include <arrow/pretty_print.h>
#include <arrow/result.h>
#include <arrow/status.h>
Expand All @@ -28,6 +30,17 @@
#include <gmock/gmock.h>
#include <grpc++/grpc++.h>
#include <gtest/gtest.h>
#include <opentelemetry/context/propagation/global_propagator.h>
#include <opentelemetry/context/propagation/text_map_propagator.h>
#include <opentelemetry/exporters/ostream/span_exporter_factory.h>
#include <opentelemetry/sdk/common/global_log_handler.h>
#include <opentelemetry/sdk/trace/simple_processor_factory.h>
#include <opentelemetry/sdk/trace/tracer.h>
#include <opentelemetry/sdk/trace/tracer_provider.h>
#include <opentelemetry/sdk/trace/tracer_provider_factory.h>
#include <opentelemetry/trace/propagation/http_trace_context.h>
#include <opentelemetry/trace/provider.h>
#include <opentelemetry/trace/scope.h>
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>
#include <protos/helloworld.grpc.pb.h>
Expand Down Expand Up @@ -148,7 +161,7 @@ class ParquetStorageService : public arrow::flight::FlightServerBase {
endpoint.ticket.ticket = file_info.base_name();
arrow::flight::Location location;
ARROW_ASSIGN_OR_RAISE(location,
arrow::flight::Location::ForGrpcTcp("localhost", port()));
arrow::flight::Location::ForGrpcTcp("localhost", port()));
endpoint.locations.push_back(location);

int64_t total_records = reader->parquet_reader()->metadata()->num_rows();
Expand Down Expand Up @@ -197,7 +210,7 @@ arrow::Status TestPutGetDelete() {

arrow::flight::Location server_location;
ARROW_ASSIGN_OR_RAISE(server_location,
arrow::flight::Location::ForGrpcTcp("0.0.0.0", 0));
arrow::flight::Location::ForGrpcTcp("0.0.0.0", 0));

arrow::flight::FlightServerOptions options(server_location);
auto server = std::unique_ptr<arrow::flight::FlightServerBase>(
Expand All @@ -209,7 +222,7 @@ arrow::Status TestPutGetDelete() {
StartRecipe("ParquetStorageService::Connect");
arrow::flight::Location location;
ARROW_ASSIGN_OR_RAISE(location,
arrow::flight::Location::ForGrpcTcp("localhost", server->port()));
arrow::flight::Location::ForGrpcTcp("localhost", server->port()));

std::unique_ptr<arrow::flight::FlightClient> client;
ARROW_ASSIGN_OR_RAISE(client, arrow::flight::FlightClient::Connect(location));
Expand Down Expand Up @@ -317,7 +330,7 @@ arrow::Status TestClientOptions() {

arrow::flight::Location server_location;
ARROW_ASSIGN_OR_RAISE(server_location,
arrow::flight::Location::ForGrpcTcp("0.0.0.0", 0));
arrow::flight::Location::ForGrpcTcp("0.0.0.0", 0));

arrow::flight::FlightServerOptions options(server_location);
auto server = std::unique_ptr<arrow::flight::FlightServerBase>(
Expand All @@ -331,12 +344,12 @@ arrow::Status TestClientOptions() {

arrow::flight::Location location;
ARROW_ASSIGN_OR_RAISE(location,
arrow::flight::Location::ForGrpcTcp("localhost", server->port()));
arrow::flight::Location::ForGrpcTcp("localhost", server->port()));

std::unique_ptr<arrow::flight::FlightClient> client;
// pass client_options into Connect()
ARROW_ASSIGN_OR_RAISE(client,
arrow::flight::FlightClient::Connect(location, client_options));
arrow::flight::FlightClient::Connect(location, client_options));
rout << "Connected to " << location.ToString() << std::endl;
EndRecipe("TestClientOptions::Connect");

Expand All @@ -354,7 +367,7 @@ arrow::Status TestCustomGrpcImpl() {
StartRecipe("CustomGrpcImpl::StartServer");
arrow::flight::Location server_location;
ARROW_ASSIGN_OR_RAISE(server_location,
arrow::flight::Location::ForGrpcTcp("0.0.0.0", 5000));
arrow::flight::Location::ForGrpcTcp("0.0.0.0", 3000));

arrow::flight::FlightServerOptions options(server_location);
auto server = std::unique_ptr<arrow::flight::FlightServerBase>(
Expand All @@ -375,7 +388,7 @@ arrow::Status TestCustomGrpcImpl() {

StartRecipe("CustomGrpcImpl::CreateClient");
auto client_channel =
grpc::CreateChannel("0.0.0.0:5000", grpc::InsecureChannelCredentials());
grpc::CreateChannel("0.0.0.0:3000", grpc::InsecureChannelCredentials());

auto stub = HelloWorldService::NewStub(client_channel);

Expand All @@ -393,10 +406,89 @@ arrow::Status TestCustomGrpcImpl() {
return arrow::Status::OK();
}

arrow::Status TestPropagateSpansImpl() {
StartRecipe("PropagateSpansImpl::SetGlobalPropagator");
namespace trace_propagation = opentelemetry::context::propagation;
trace_propagation::GlobalTextMapPropagator::SetGlobalPropagator(
opentelemetry::nostd::shared_ptr<trace_propagation::TextMapPropagator>(
new opentelemetry::trace::propagation::HttpTraceContext()));
EndRecipe("PropagateSpansImpl::SetGlobalPropagator");

StartRecipe("PropagateSpansImpl::SetTraceProvider");
namespace trace_api = opentelemetry::trace;
namespace trace_sdk = opentelemetry::sdk::trace;

std::stringstream trace_out;

auto os_exporter =
opentelemetry::exporter::trace::OStreamSpanExporterFactory::Create(trace_out);
auto os_processor =
trace_sdk::SimpleSpanProcessorFactory::Create(std::move(os_exporter));
auto provider = trace_sdk::TracerProviderFactory::Create(std::move(os_processor));
trace_api::Provider::SetTracerProvider(std::move(provider));
EndRecipe("PropagateSpansImpl::SetTraceProvider");

// Build flight service as usual
auto fs = std::make_shared<arrow::fs::LocalFileSystem>();
ARROW_RETURN_NOT_OK(fs->CreateDir("./flight_datasets/"));
ARROW_RETURN_NOT_OK(fs->DeleteDirContents("./flight_datasets/"));
auto root = std::make_shared<arrow::fs::SubTreeFileSystem>("./flight_datasets/", fs);

auto server = std::unique_ptr<arrow::flight::FlightServerBase>(
new ParquetStorageService(std::move(root)));

arrow::flight::Location server_location;
ARROW_ASSIGN_OR_RAISE(server_location,
arrow::flight::Location::ForGrpcTcp("0.0.0.0", 3000));

StartRecipe("PropagateSpansImpl::AddServerMiddleware");
arrow::flight::FlightServerOptions options(server_location);

options.middleware.emplace_back("tracing",
arrow::flight::MakeTracingServerMiddlewareFactory());
ARROW_RETURN_NOT_OK(server->Init(options));
rout << "Listening on port " << server->port() << std::endl;
EndRecipe("PropagateSpansImpl::AddServerMiddleware");

StartRecipe("PropagateSpansImpl::AddClientMiddleware");
auto client_options = arrow::flight::FlightClientOptions::Defaults();
client_options.middleware.emplace_back(
arrow::flight::MakeTracingClientMiddlewareFactory());

arrow::flight::Location location;
ARROW_ASSIGN_OR_RAISE(location,
arrow::flight::Location::ForGrpcTcp("localhost", server->port()));

std::unique_ptr<arrow::flight::FlightClient> client;
// pass client_options into Connect()
ARROW_ASSIGN_OR_RAISE(client,
arrow::flight::FlightClient::Connect(location, client_options));
rout << "Connected to " << location.ToString() << std::endl;
EndRecipe("PropagateSpansImpl::AddClientMiddleware");

StartRecipe("PropagateSpansImpl::MakeTracedCall");
{
auto provider = opentelemetry::trace::Provider::GetTracerProvider();
auto tracer = provider->GetTracer("my_client", "1.0.0");
auto span = tracer->StartSpan("test");
auto scope = tracer->WithActiveSpan(span);

auto descriptor = arrow::flight::FlightDescriptor::Path({"airquality.parquet"});
auto _ = client->GetFlightInfo(descriptor);
}
rout << trace_out.str() << std::endl;
EndRecipe("PropagateSpansImpl::MakeTracedCall");

return arrow::Status::OK();
}

TEST(ParquetStorageServiceTest, PutGetDelete) { ASSERT_OK(TestPutGetDelete()); }
TEST(ParquetStorageServiceTest, TestClientOptions) {
auto status = TestClientOptions();
ASSERT_EQ(status.code(), arrow::StatusCode::Invalid);
ASSERT_THAT(status.message(), testing::HasSubstr("resource exhausted"));
}
TEST(ParquetStorageServiceTest, TestCustomGrpcImpl) { ASSERT_OK(TestCustomGrpcImpl()); }
TEST(ParquetStorageServiceTest, TestPropagateSpansImpl) {
ASSERT_OK(TestPropagateSpansImpl());
}
2 changes: 2 additions & 0 deletions cpp/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ dependencies:
- gmock
- pyarrow==10.0.1
- clang-tools
- cpp-opentelemetry-api
- cpp-opentelemetry-sdk
49 changes: 49 additions & 0 deletions cpp/source/flight.rst
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,52 @@ gRPC definition.

.. recipe:: ../code/flight.cc CustomGrpcImpl::CreateClient
:dedent: 2


Propagating OpenTelemetry Spans
===============================


Configuring the server
----------------------

In order to get OpenTelemetry spans from clients to the servers, you must add
the following to your server:

1. Register the global OpenTelemetry propagator.
2. Setup OpenTelemetry SDK with a ``TraceProvider`` so that spans have somewhere to go.
3. Add the Arrow tracing middleware in the server options before initializing.

Setting the global propagator makes sure that OpenTelemetry knows how to extract
the trace information from the transport headers.

.. recipe:: ../code/flight.cc PropagateSpansImpl::SetGlobalPropagator
:dedent: 2

Next, traces need to be collected somewhere, otherwise all OpenTelemetry operations
will be a no-op. This should be configured to export to your preferred telemetry
system. For the purposes of the example, we just use the output stream exporter.

.. recipe:: ../code/flight.cc PropagateSpansImpl::SetTraceProvider
:dedent: 2

Finally, we can add the middleware to the server option and initialize the server.

.. recipe:: ../code/flight.cc PropagateSpansImpl::AddServerMiddleware
:dedent: 2


Configuring the client
----------------------

Clients need to follow the same steps as the server, except they should use the
client middleware.

.. recipe:: ../code/flight.cc PropagateSpansImpl::AddClientMiddleware
:dedent: 2


Then we can make the call:

.. recipe:: ../code/flight.cc PropagateSpansImpl::MakeTracedCall
:dedent: 2