diff --git a/sdk/include/opentelemetry/sdk/logs/exporter.h b/sdk/include/opentelemetry/sdk/logs/exporter.h new file mode 100644 index 0000000000..6f44d32a28 --- /dev/null +++ b/sdk/include/opentelemetry/sdk/logs/exporter.h @@ -0,0 +1,72 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include "opentelemetry/logs/log_record.h" +#include "opentelemetry/nostd/span.h" +#include "opentelemetry/sdk/logs/processor.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace logs +{ +/** + * ExportResult is returned as result of exporting a batch of Log Records. + */ +enum class ExportResult +{ + // The batch was exported successfully + kSuccess = 0, + // The batch was exported unsuccessfully and was dropped, but can not be retried + kFailure +}; + +/** + * LogExporter defines the interface that log exporters must implement. + */ +class LogExporter +{ +public: + virtual ~LogExporter() = default; + + /** + * Exports the batch of log records to their export destination. + * This method must not be called concurrently for the same exporter instance. + * The exporter may attempt to retry sending the batch, but should drop + * and return kFailure after a certain timeout. + * @param records a span of unique pointers to log records + * @returns an ExportResult code (whether export was success or failure) + */ + virtual ExportResult Export( + const nostd::span> &records) noexcept = 0; + + /** + * Marks the exporter as ShutDown and cleans up any resources as required. + * Shutdown should be called only once for each Exporter instance. + * @param timeout minimum amount of microseconds to wait for shutdown before giving up and + * returning failure. + * @return true if the exporter shutdown succeeded, false otherwise + */ + virtual bool Shutdown( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept = 0; +}; +} // namespace logs +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/include/opentelemetry/sdk/logs/processor.h b/sdk/include/opentelemetry/sdk/logs/processor.h index d77b7a65d0..7468548dcf 100644 --- a/sdk/include/opentelemetry/sdk/logs/processor.h +++ b/sdk/include/opentelemetry/sdk/logs/processor.h @@ -26,21 +26,37 @@ namespace sdk namespace logs { /** - * This Log Processor is responsible for conversion of logs to exportable - * representation and passing them to exporters. + * The Log Processor is responsible for passing log records + * to the configured exporter. */ class LogProcessor { public: virtual ~LogProcessor() = default; + /** + * OnReceive is called by the SDK once a log record has been successfully created. + * @param record the log record + */ virtual void OnReceive(std::unique_ptr &&record) noexcept = 0; - virtual void ForceFlush( - std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept = 0; + /** + * Exports all log records that have not yet been exported to the configured Exporter. + * @param timeout that the forceflush is required to finish within. + * @return a result code indicating whether it succeeded, failed or timed out + */ + virtual bool ForceFlush( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept = 0; - virtual void Shutdown( - std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept = 0; + /** + * Shuts down the processor and does any cleanup required. + * ShutDown should only be called once for each processor. + * @param timeout minimum amount of microseconds to wait for + * shutdown before giving up and returning failure. + * @return true if the shutdown succeeded, false otherwise + */ + virtual bool Shutdown( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept = 0; }; } // namespace logs } // namespace sdk diff --git a/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h b/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h new file mode 100644 index 0000000000..18ed28b2cf --- /dev/null +++ b/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h @@ -0,0 +1,64 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "opentelemetry/common/spin_lock_mutex.h" +#include "opentelemetry/sdk/logs/exporter.h" +#include "opentelemetry/sdk/logs/processor.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace logs +{ +/** + * The simple log processor passes all log records + * in a batch of 1 to the configured + * LogExporter. + * + * All calls to the configured LogExporter are synchronized using a + * spin-lock on an atomic_flag. + */ +class SimpleLogProcessor : public LogProcessor +{ + +public: + explicit SimpleLogProcessor(std::unique_ptr &&exporter); + virtual ~SimpleLogProcessor() = default; + + void OnReceive(std::unique_ptr &&record) noexcept override; + + bool ForceFlush( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; + + bool Shutdown( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; + +private: + // The configured exporter + std::unique_ptr exporter_; + // The lock used to ensure the exporter is not called concurrently + opentelemetry::common::SpinLockMutex lock_; + // The atomic boolean flag to ensure the ShutDown() function is only called once + std::atomic_flag shutdown_latch_{ATOMIC_FLAG_INIT}; +}; +} // namespace logs +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/src/logs/CMakeLists.txt b/sdk/src/logs/CMakeLists.txt index 22260f059d..e2e7c2c915 100644 --- a/sdk/src/logs/CMakeLists.txt +++ b/sdk/src/logs/CMakeLists.txt @@ -1,3 +1,4 @@ -add_library(opentelemetry_logs logger_provider.cc logger.cc) +add_library(opentelemetry_logs logger_provider.cc logger.cc + simple_log_processor.cc) target_link_libraries(opentelemetry_logs opentelemetry_common) diff --git a/sdk/src/logs/simple_log_processor.cc b/sdk/src/logs/simple_log_processor.cc new file mode 100644 index 0000000000..d88fc9eb92 --- /dev/null +++ b/sdk/src/logs/simple_log_processor.cc @@ -0,0 +1,71 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "opentelemetry/sdk/logs/simple_log_processor.h" + +#include +#include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace logs +{ +/** + * Initialize a simple log processor. + * @param exporter the configured exporter where log records are sent + */ +SimpleLogProcessor::SimpleLogProcessor(std::unique_ptr &&exporter) + : exporter_(std::move(exporter)) +{} + +/** + * Batches the log record it receives in a batch of 1 and immediately sends it + * to the configured exporter + */ +void SimpleLogProcessor::OnReceive( + std::unique_ptr &&record) noexcept +{ + nostd::span> batch(&record, 1); + // Get lock to ensure Export() is never called concurrently + const std::lock_guard locked(lock_); + + if (exporter_->Export(batch) != ExportResult::kSuccess) + { + /* Alert user of the failed export */ + } +} +/** + * The simple processor does not have any log records to flush so this method is not used + */ +bool SimpleLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept +{ + return true; +} + +bool SimpleLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept +{ + // Should only shutdown exporter ONCE. + if (!shutdown_latch_.test_and_set(std::memory_order_acquire)) + { + return exporter_->Shutdown(timeout); + } + + return false; +} +} // namespace logs +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/test/logs/BUILD b/sdk/test/logs/BUILD index b58eac0eee..ccbbfd8c78 100644 --- a/sdk/test/logs/BUILD +++ b/sdk/test/logs/BUILD @@ -20,3 +20,14 @@ cc_test( "@com_google_googletest//:gtest_main", ], ) + +cc_test( + name = "simple_log_processor_test", + srcs = [ + "simple_log_processor_test.cc", + ], + deps = [ + "//sdk/src/logs", + "@com_google_googletest//:gtest_main", + ], +) diff --git a/sdk/test/logs/CMakeLists.txt b/sdk/test/logs/CMakeLists.txt index c1342a16b9..024c42c45a 100644 --- a/sdk/test/logs/CMakeLists.txt +++ b/sdk/test/logs/CMakeLists.txt @@ -1,4 +1,5 @@ -foreach(testname logger_provider_sdk_test logger_sdk_test) +foreach(testname logger_provider_sdk_test logger_sdk_test + simple_log_processor_test) add_executable(${testname} "${testname}.cc") target_link_libraries(${testname} ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} opentelemetry_logs) diff --git a/sdk/test/logs/logger_provider_sdk_test.cc b/sdk/test/logs/logger_provider_sdk_test.cc index a2a020c838..02e7c47275 100644 --- a/sdk/test/logs/logger_provider_sdk_test.cc +++ b/sdk/test/logs/logger_provider_sdk_test.cc @@ -70,8 +70,14 @@ TEST(LoggerProviderSDK, LoggerProviderLoggerArguments) class DummyProcessor : public LogProcessor { void OnReceive(std::unique_ptr &&record) noexcept {} - void ForceFlush(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept {} - void Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept {} + bool ForceFlush(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept + { + return true; + } + bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept + { + return true; + } }; TEST(LoggerProviderSDK, GetAndSetProcessor) diff --git a/sdk/test/logs/logger_sdk_test.cc b/sdk/test/logs/logger_sdk_test.cc index 6c36b68c0c..d4825d0032 100644 --- a/sdk/test/logs/logger_sdk_test.cc +++ b/sdk/test/logs/logger_sdk_test.cc @@ -38,8 +38,14 @@ TEST(LoggerSDK, LogToNullProcessor) class DummyProcessor : public LogProcessor { void OnReceive(std::unique_ptr &&record) noexcept {} - void ForceFlush(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept {} - void Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept {} + bool ForceFlush(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept + { + return true; + } + bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept + { + return true; + } }; TEST(LoggerSDK, LogToAProcessor) diff --git a/sdk/test/logs/simple_log_processor_test.cc b/sdk/test/logs/simple_log_processor_test.cc new file mode 100644 index 0000000000..834cba7ad8 --- /dev/null +++ b/sdk/test/logs/simple_log_processor_test.cc @@ -0,0 +1,138 @@ +#include "opentelemetry/sdk/logs/simple_log_processor.h" +#include "opentelemetry/sdk/logs/exporter.h" + +#include + +#include +#include + +using namespace opentelemetry::sdk::logs; +using opentelemetry::logs::LogRecord; + +/* + * A test exporter that can return a vector of all the records it has received, + * and keep track of the number of times its Shutdown() function was called. + */ +class TestExporter final : public LogExporter +{ +public: + TestExporter(int *shutdown_counter, + std::shared_ptr> logs_received, + int *batch_size_received) + : shutdown_counter_(shutdown_counter), + logs_received_(logs_received), + batch_size_received(batch_size_received) + {} + + // Stores the names of the log records this exporter receives to an internal list + ExportResult Export( + const opentelemetry::nostd::span> &records) noexcept override + { + *batch_size_received = records.size(); + for (auto &record : records) + { + logs_received_->push_back(record->name.data()); + } + return ExportResult::kSuccess; + } + + // Increment the shutdown counter everytime this method is called + bool Shutdown(std::chrono::microseconds timeout) noexcept override + { + *shutdown_counter_ += 1; + return true; + } + +private: + int *shutdown_counter_; + std::shared_ptr> logs_received_; + int *batch_size_received; +}; + +// Tests whether the simple processor successfully creates a batch of size 1 +// and whether the contents of the record is sent to the exporter correctly +TEST(SimpleLogProcessorTest, SendReceivedLogsToExporter) +{ + // Create a simple processor with a TestExporter attached + std::shared_ptr> logs_received(new std::vector); + int batch_size_received = 0; + + std::unique_ptr exporter( + new TestExporter(nullptr, logs_received, &batch_size_received)); + + SimpleLogProcessor processor(std::move(exporter)); + + // Send some log records to the processor (which should then send to the TestExporter) + const int num_logs = 5; + for (int i = 0; i < num_logs; i++) + { + auto record = std::unique_ptr(new LogRecord()); + std::string s("Log name"); + s += std::to_string(i); + record->name = s; + + processor.OnReceive(std::move(record)); + + // Verify that the batch of 1 log record sent by processor matches what exporter received + EXPECT_EQ(1, batch_size_received); + } + + // Test whether the processor's log sent matches the log record received by the exporter + EXPECT_EQ(logs_received->size(), num_logs); + for (int i = 0; i < num_logs; i++) + { + std::string s("Log name"); + s += std::to_string(i); + EXPECT_EQ(s, logs_received->at(i)); + } +} + +// Tests behavior when calling the processor's ShutDown() multiple times +TEST(SimpleLogProcessorTest, ShutdownCalledOnce) +{ + // Create a TestExporter + int num_shutdowns = 0; + + std::unique_ptr exporter(new TestExporter(&num_shutdowns, nullptr, nullptr)); + + // Create a processor with the previous test exporter + SimpleLogProcessor processor(std::move(exporter)); + + // The first time processor shutdown is called + EXPECT_EQ(0, num_shutdowns); + EXPECT_EQ(true, processor.Shutdown()); + EXPECT_EQ(1, num_shutdowns); + + // The second time processor shutdown is called + EXPECT_EQ(false, processor.Shutdown()); + // Processor::ShutDown(), even if called more than once, should only shutdown exporter once + EXPECT_EQ(1, num_shutdowns); +} + +// A test exporter that always returns failure when shut down +class FailShutDownExporter final : public LogExporter +{ +public: + FailShutDownExporter() {} + + ExportResult Export( + const opentelemetry::nostd::span> &records) noexcept override + { + return ExportResult::kSuccess; + } + + bool Shutdown(std::chrono::microseconds timeout) noexcept override { return false; } +}; + +// Tests for when when processor should fail to shutdown +TEST(SimpleLogProcessorTest, ShutDownFail) +{ + std::unique_ptr exporter(new FailShutDownExporter()); + SimpleLogProcessor processor(std::move(exporter)); + + // Expect failure result when exporter fails to shutdown + EXPECT_EQ(false, processor.Shutdown()); + + // Expect failure result when processor given a negative timeout allowed to shutdown + EXPECT_EQ(false, processor.Shutdown(std::chrono::microseconds(-1))); +}