Skip to content

Commit

Permalink
Reorder the destructor of members in LoggerProvider and TracerProvider (
Browse files Browse the repository at this point in the history
  • Loading branch information
owent authored Mar 8, 2022
1 parent cba0a2a commit c96a3e3
Show file tree
Hide file tree
Showing 26 changed files with 150 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,8 @@ class ElasticSearchRecordable final : public sdk::logs::Recordable
void SetSeverity(opentelemetry::logs::Severity severity) noexcept override
{
// Convert the severity enum to a string
int severity_index = static_cast<int>(severity);
if (severity_index < 0 ||
severity_index >= std::extent<decltype(opentelemetry::logs::SeverityNumToText)>::value)
std::uint32_t severity_index = static_cast<std::uint32_t>(severity);
if (severity_index >= std::extent<decltype(opentelemetry::logs::SeverityNumToText)>::value)
{
std::stringstream sout;
sout << "Invalid severity(" << severity_index << ")";
Expand Down
7 changes: 3 additions & 4 deletions exporters/ostream/src/log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,11 @@ sdk::common::ExportResult OStreamLogExporter::Export(
// into severity_num and severity_text
sout_ << "{\n"
<< " timestamp : " << log_record->GetTimestamp().time_since_epoch().count() << "\n"
<< " severity_num : " << static_cast<int>(log_record->GetSeverity()) << "\n"
<< " severity_num : " << static_cast<std::uint32_t>(log_record->GetSeverity()) << "\n"
<< " severity_text : ";

int severity_index = static_cast<int>(log_record->GetSeverity());
if (severity_index < 0 ||
severity_index >= std::extent<decltype(opentelemetry::logs::SeverityNumToText)>::value)
std::uint32_t severity_index = static_cast<std::uint32_t>(log_record->GetSeverity());
if (severity_index >= std::extent<decltype(opentelemetry::logs::SeverityNumToText)>::value)
{
sout_ << "Invalid severity(" << severity_index << ")\n";
}
Expand Down
5 changes: 5 additions & 0 deletions exporters/otlp/src/otlp_grpc_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ sdk::common::ExportResult OtlpGrpcExporter::Export(
<< " span(s) failed, exporter is shutdown");
return sdk::common::ExportResult::kFailure;
}
if (spans.empty())
{
return sdk::common::ExportResult::kSuccess;
}

proto::collector::trace::v1::ExportTraceServiceRequest request;
OtlpRecordableUtils::PopulateRequest(spans, &request);

Expand Down
5 changes: 5 additions & 0 deletions exporters/otlp/src/otlp_grpc_log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcLogExporter::Export(
<< " log(s) failed, exporter is shutdown");
return sdk::common::ExportResult::kFailure;
}
if (logs.empty())
{
return sdk::common::ExportResult::kSuccess;
}

proto::collector::logs::v1::ExportLogsServiceRequest request;
OtlpRecordableUtils::PopulateRequest(logs, &request);

Expand Down
5 changes: 5 additions & 0 deletions exporters/otlp/src/otlp_http_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ std::unique_ptr<opentelemetry::sdk::trace::Recordable> OtlpHttpExporter::MakeRec
opentelemetry::sdk::common::ExportResult OtlpHttpExporter::Export(
const nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans) noexcept
{
if (spans.empty())
{
return opentelemetry::sdk::common::ExportResult::kSuccess;
}

proto::collector::trace::v1::ExportTraceServiceRequest service_request;
OtlpRecordableUtils::PopulateRequest(spans, &service_request);
return http_client_->Export(service_request);
Expand Down
4 changes: 4 additions & 0 deletions exporters/otlp/src/otlp_http_log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ std::unique_ptr<opentelemetry::sdk::logs::Recordable> OtlpHttpLogExporter::MakeR
opentelemetry::sdk::common::ExportResult OtlpHttpLogExporter::Export(
const nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &logs) noexcept
{
if (logs.empty())
{
return opentelemetry::sdk::common::ExportResult::kSuccess;
}
proto::collector::logs::v1::ExportLogsServiceRequest service_request;
OtlpRecordableUtils::PopulateRequest(logs, &service_request);
return http_client_->Export(service_request);
Expand Down
4 changes: 2 additions & 2 deletions exporters/otlp/test/otlp_http_log_exporter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ TEST_F(OtlpHttpLogExporterTestPeer, ExportJsonIntegrationTest)

auto provider = nostd::shared_ptr<sdk::logs::LoggerProvider>(new sdk::logs::LoggerProvider());
provider->AddProcessor(std::unique_ptr<sdk::logs::LogProcessor>(
new sdk::logs::BatchLogProcessor(std::move(exporter), 5, std::chrono::milliseconds(256), 1)));
new sdk::logs::BatchLogProcessor(std::move(exporter), 5, std::chrono::milliseconds(256), 5)));

std::string report_trace_id;
std::string report_span_id;
Expand Down Expand Up @@ -192,7 +192,7 @@ TEST_F(OtlpHttpLogExporterTestPeer, ExportBinaryIntegrationTest)

auto provider = nostd::shared_ptr<sdk::logs::LoggerProvider>(new sdk::logs::LoggerProvider());
provider->AddProcessor(std::unique_ptr<sdk::logs::LogProcessor>(
new sdk::logs::BatchLogProcessor(std::move(exporter), 5, std::chrono::milliseconds(256), 1)));
new sdk::logs::BatchLogProcessor(std::move(exporter), 5, std::chrono::milliseconds(256), 5)));

std::string report_trace_id;
std::string report_span_id;
Expand Down
4 changes: 3 additions & 1 deletion ext/test/http/curl_http_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,9 @@ TEST_F(BasicCurlHttpTests, SendGetRequestSyncTimeout)
auto result = http_client.Get("http://222.222.222.200:19000/get/", m1);
EXPECT_EQ(result, false);

EXPECT_EQ(result.GetSessionState(), http_client::SessionState::ConnectFailed);
// When network is under proxy, it may connect success but closed by peer when send data
EXPECT_TRUE(result.GetSessionState() == http_client::SessionState::ConnectFailed ||
result.GetSessionState() == http_client::SessionState::SendFailed);
}

TEST_F(BasicCurlHttpTests, SendPostRequestSync)
Expand Down
5 changes: 4 additions & 1 deletion sdk/include/opentelemetry/sdk/_metrics/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ class PushController
{
if (active_.exchange(false))
{
runner_.join();
if (runner_.joinable())
{
runner_.join();
}
tick(); // flush metrics sitting in the processor
}
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/include/opentelemetry/sdk/logs/batch_log_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class BatchLogProcessor : public LogProcessor

/* Synchronization primitives */
std::condition_variable cv_, force_flush_cv_;
std::mutex cv_m_, force_flush_cv_m_;
std::mutex cv_m_, force_flush_cv_m_, shutdown_m_;

/* The buffer/queue to which the ended logs are added */
common::CircularBuffer<Recordable> buffer_;
Expand Down
5 changes: 3 additions & 2 deletions sdk/include/opentelemetry/sdk/logs/logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ class Logger final : public opentelemetry::logs::Logger
// The name of this logger
std::string logger_name_;

// The logger context of this Logger. Uses a weak_ptr to avoid cyclic dependency issues the with
std::weak_ptr<LoggerContext> context_;
// order of declaration is important here - instrumentation library should destroy after
// logger-context.
std::unique_ptr<instrumentationlibrary::InstrumentationLibrary> instrumentation_library_;
std::shared_ptr<LoggerContext> context_;
};

} // namespace logs
Expand Down
20 changes: 14 additions & 6 deletions sdk/include/opentelemetry/sdk/logs/logger_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class LoggerProvider final : public opentelemetry::logs::LoggerProvider
*/
explicit LoggerProvider(std::shared_ptr<sdk::logs::LoggerContext> context) noexcept;

~LoggerProvider();

/**
* Creates a logger with the given name, and returns a shared pointer to it.
* If a logger with that name already exists, return a shared pointer to it
Expand Down Expand Up @@ -107,14 +109,20 @@ class LoggerProvider final : public opentelemetry::logs::LoggerProvider
*/
const opentelemetry::sdk::resource::Resource &GetResource() const noexcept;

private:
// A pointer to the processor stored by this logger provider
std::shared_ptr<sdk::logs::LoggerContext> context_;
/**
* Shutdown the log processor associated with this log provider.
*/
bool Shutdown() noexcept;

// A vector of pointers to all the loggers that have been created
std::vector<std::shared_ptr<opentelemetry::sdk::logs::Logger>> loggers_;
/**
* Force flush the log processor associated with this log provider.
*/
bool ForceFlush(std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept;

// A mutex that ensures only one thread is using the map of loggers
private:
// order of declaration is important here - loggers should destroy only after context.
std::vector<std::shared_ptr<opentelemetry::sdk::logs::Logger>> loggers_;
std::shared_ptr<sdk::logs::LoggerContext> context_;
std::mutex lock_;
};
} // namespace logs
Expand Down
2 changes: 1 addition & 1 deletion sdk/include/opentelemetry/sdk/trace/batch_span_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class BatchSpanProcessor : public SpanProcessor

/* Synchronization primitives */
std::condition_variable cv_, force_flush_cv_;
std::mutex cv_m_, force_flush_cv_m_;
std::mutex cv_m_, force_flush_cv_m_, shutdown_m_;

/* The buffer/queue to which the ended spans are added */
common::CircularBuffer<Recordable> buffer_;
Expand Down
2 changes: 2 additions & 0 deletions sdk/include/opentelemetry/sdk/trace/tracer_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class TracerProvider final : public opentelemetry::trace::TracerProvider
*/
explicit TracerProvider(std::shared_ptr<sdk::trace::TracerContext> context) noexcept;

~TracerProvider();

opentelemetry::nostd::shared_ptr<opentelemetry::trace::Tracer> GetTracer(
nostd::string_view library_name,
nostd::string_view library_version = "",
Expand Down
14 changes: 10 additions & 4 deletions sdk/src/logs/batch_log_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,17 @@ void BatchLogProcessor::DrainQueue()

bool BatchLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept
{
is_shutdown_.store(true);
std::lock_guard<std::mutex> shutdown_guard{shutdown_m_};
bool already_shutdown = is_shutdown_.exchange(true);

cv_.notify_one();
worker_thread_.join();
if (exporter_ != nullptr)
if (worker_thread_.joinable())
{
cv_.notify_one();
worker_thread_.join();
}

// Should only shutdown exporter ONCE.
if (!already_shutdown && exporter_ != nullptr)
{
return exporter_->Shutdown();
}
Expand Down
11 changes: 5 additions & 6 deletions sdk/src/logs/logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ Logger::Logger(nostd::string_view name,
std::unique_ptr<instrumentationlibrary::InstrumentationLibrary>
instrumentation_library) noexcept
: logger_name_(std::string(name)),
context_(context),
instrumentation_library_{std::move(instrumentation_library)}
instrumentation_library_(std::move(instrumentation_library)),
context_(context)
{}

const nostd::string_view Logger::GetName() noexcept
Expand All @@ -45,12 +45,11 @@ void Logger::Log(opentelemetry::logs::Severity severity,
common::SystemTimestamp timestamp) noexcept
{
// If this logger does not have a processor, no need to create a log record
auto context = context_.lock();
if (!context)
if (!context_)
{
return;
}
auto &processor = context->GetProcessor();
auto &processor = context_->GetProcessor();

// TODO: Sampler (should include check for minSeverity)

Expand All @@ -68,7 +67,7 @@ void Logger::Log(opentelemetry::logs::Severity severity,
recordable->SetBody(body);
recordable->SetInstrumentationLibrary(GetInstrumentationLibrary());

recordable->SetResource(context->GetResource());
recordable->SetResource(context_->GetResource());

attributes.ForEachKeyValue([&](nostd::string_view key, common::AttributeValue value) noexcept {
recordable->SetAttribute(key, value);
Expand Down
21 changes: 21 additions & 0 deletions sdk/src/logs/logger_provider.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ LoggerProvider::LoggerProvider(std::shared_ptr<sdk::logs::LoggerContext> context
: context_{context}
{}

LoggerProvider::~LoggerProvider()
{
// Logger hold the shared pointer to the context. So we can not use destructor of LoggerContext to
// Shutdown and flush all pending recordables when we hasve more than one loggers.These
// recordables may use the raw pointer of instrumentation_library_ in Logger
if (context_)
{
context_->Shutdown();
}
}

nostd::shared_ptr<opentelemetry::logs::Logger> LoggerProvider::GetLogger(
nostd::string_view logger_name,
nostd::string_view options,
Expand Down Expand Up @@ -105,6 +116,16 @@ const opentelemetry::sdk::resource::Resource &LoggerProvider::GetResource() cons
return context_->GetResource();
}

bool LoggerProvider::Shutdown() noexcept
{
return context_->Shutdown();
}

bool LoggerProvider::ForceFlush(std::chrono::microseconds timeout) noexcept
{
return context_->ForceFlush(timeout);
}

} // namespace logs
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
Expand Down
6 changes: 2 additions & 4 deletions sdk/src/logs/multi_log_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,8 @@ bool MultiLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept
}
for (auto &processor : processors_)
{
if (!processor->Shutdown(std::chrono::duration_cast<std::chrono::microseconds>(timeout_ns)))
{
result = false;
}
result |=
processor->Shutdown(std::chrono::duration_cast<std::chrono::microseconds>(timeout_ns));
start_time = std::chrono::system_clock::now();
if (expire_time > start_time)
{
Expand Down
4 changes: 2 additions & 2 deletions sdk/src/logs/simple_log_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ bool SimpleLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept
bool SimpleLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept
{
// Should only shutdown exporter ONCE.
if (!shutdown_latch_.test_and_set(std::memory_order_acquire))
if (!shutdown_latch_.test_and_set(std::memory_order_acquire) && exporter_ != nullptr)
{
return exporter_->Shutdown(timeout);
}

return false;
return true;
}
} // namespace logs
} // namespace sdk
Expand Down
14 changes: 10 additions & 4 deletions sdk/src/trace/batch_span_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,17 @@ void BatchSpanProcessor::DrainQueue()

bool BatchSpanProcessor::Shutdown(std::chrono::microseconds timeout) noexcept
{
is_shutdown_.store(true);
std::lock_guard<std::mutex> shutdown_guard{shutdown_m_};
bool already_shutdown = is_shutdown_.exchange(true);

cv_.notify_one();
worker_thread_.join();
if (exporter_ != nullptr)
if (worker_thread_.joinable())
{
cv_.notify_one();
worker_thread_.join();
}

// Should only shutdown exporter ONCE.
if (!already_shutdown && exporter_ != nullptr)
{
return exporter_->Shutdown();
}
Expand Down
11 changes: 11 additions & 0 deletions sdk/src/trace/tracer_provider.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ TracerProvider::TracerProvider(std::vector<std::unique_ptr<SpanProcessor>> &&pro
std::move(id_generator));
}

TracerProvider::~TracerProvider()
{
// Tracer hold the shared pointer to the context. So we can not use destructor of TracerContext to
// Shutdown and flush all pending recordables when we have more than one tracers.These recordables
// may use the raw pointer of instrumentation_library_ in Tracer
if (context_)
{
context_->Shutdown();
}
}

nostd::shared_ptr<trace_api::Tracer> TracerProvider::GetTracer(
nostd::string_view library_name,
nostd::string_view library_version,
Expand Down
2 changes: 2 additions & 0 deletions sdk/test/logs/batch_log_processor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ TEST_F(BatchLogProcessorTest, TestShutdown)
// current batch of logs to be sent to the log exporter
// by checking the number of logs sent and the names of the logs sent
EXPECT_EQ(true, batch_processor->Shutdown());
// It's safe to shutdown again
EXPECT_TRUE(batch_processor->Shutdown());

EXPECT_EQ(num_logs, logs_received->size());

Expand Down
27 changes: 27 additions & 0 deletions sdk/test/logs/logger_provider_sdk_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# include "opentelemetry/sdk/logs/log_record.h"
# include "opentelemetry/sdk/logs/logger.h"
# include "opentelemetry/sdk/logs/logger_provider.h"
# include "opentelemetry/sdk/logs/simple_log_processor.h"

# include <gtest/gtest.h>

Expand Down Expand Up @@ -103,4 +104,30 @@ TEST(LoggerProviderSDK, GetResource)
LoggerProvider lp{nullptr, resource};
ASSERT_EQ(nostd::get<std::string>(lp.GetResource().GetAttributes().at("key")), "value");
}

TEST(LoggerProviderSDK, Shutdown)
{
std::unique_ptr<SimpleLogProcessor> processor(new SimpleLogProcessor(nullptr));
std::vector<std::unique_ptr<LogProcessor>> processors;
processors.push_back(std::move(processor));

LoggerProvider lp(std::make_shared<LoggerContext>(std::move(processors)));

EXPECT_TRUE(lp.Shutdown());

// It's safe to shutdown again
EXPECT_TRUE(lp.Shutdown());
}

TEST(LoggerProviderSDK, ForceFlush)
{
std::unique_ptr<SimpleLogProcessor> processor(new SimpleLogProcessor(nullptr));
std::vector<std::unique_ptr<LogProcessor>> processors;
processors.push_back(std::move(processor));

LoggerProvider lp(std::make_shared<LoggerContext>(std::move(processors)));

EXPECT_TRUE(lp.ForceFlush());
}

#endif
Loading

0 comments on commit c96a3e3

Please sign in to comment.