Skip to content

Commit

Permalink
Add simple log processor and log exporter interface (#403)
Browse files Browse the repository at this point in the history
* Add simple log processor and log exporter interface

* review comments

* update default duration and description

* Use span instead of vector for Export()

* Minor: timeout changed (for consistency)

* Remove TODOs, run format CI, and rebase master
  • Loading branch information
Karen Xu authored Dec 4, 2020
1 parent 956270e commit daab565
Show file tree
Hide file tree
Showing 10 changed files with 398 additions and 12 deletions.
72 changes: 72 additions & 0 deletions sdk/include/opentelemetry/sdk/logs/exporter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <memory>
#include <vector>
#include "opentelemetry/logs/log_record.h"
#include "opentelemetry/nostd/span.h"
#include "opentelemetry/sdk/logs/processor.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace logs
{
/**
* ExportResult is returned as result of exporting a batch of Log Records.
*/
enum class ExportResult
{
// The batch was exported successfully
kSuccess = 0,
// The batch was exported unsuccessfully and was dropped, but can not be retried
kFailure
};

/**
* LogExporter defines the interface that log exporters must implement.
*/
class LogExporter
{
public:
virtual ~LogExporter() = default;

/**
* Exports the batch of log records to their export destination.
* This method must not be called concurrently for the same exporter instance.
* The exporter may attempt to retry sending the batch, but should drop
* and return kFailure after a certain timeout.
* @param records a span of unique pointers to log records
* @returns an ExportResult code (whether export was success or failure)
*/
virtual ExportResult Export(
const nostd::span<std::unique_ptr<opentelemetry::logs::LogRecord>> &records) noexcept = 0;

/**
* Marks the exporter as ShutDown and cleans up any resources as required.
* Shutdown should be called only once for each Exporter instance.
* @param timeout minimum amount of microseconds to wait for shutdown before giving up and
* returning failure.
* @return true if the exporter shutdown succeeded, false otherwise
*/
virtual bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept = 0;
};
} // namespace logs
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
28 changes: 22 additions & 6 deletions sdk/include/opentelemetry/sdk/logs/processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,37 @@ namespace sdk
namespace logs
{
/**
* This Log Processor is responsible for conversion of logs to exportable
* representation and passing them to exporters.
* The Log Processor is responsible for passing log records
* to the configured exporter.
*/
class LogProcessor
{
public:
virtual ~LogProcessor() = default;

/**
* OnReceive is called by the SDK once a log record has been successfully created.
* @param record the log record
*/
virtual void OnReceive(std::unique_ptr<opentelemetry::logs::LogRecord> &&record) noexcept = 0;

virtual void ForceFlush(
std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept = 0;
/**
* Exports all log records that have not yet been exported to the configured Exporter.
* @param timeout that the forceflush is required to finish within.
* @return a result code indicating whether it succeeded, failed or timed out
*/
virtual bool ForceFlush(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept = 0;

virtual void Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept = 0;
/**
* Shuts down the processor and does any cleanup required.
* ShutDown should only be called once for each processor.
* @param timeout minimum amount of microseconds to wait for
* shutdown before giving up and returning failure.
* @return true if the shutdown succeeded, false otherwise
*/
virtual bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept = 0;
};
} // namespace logs
} // namespace sdk
Expand Down
64 changes: 64 additions & 0 deletions sdk/include/opentelemetry/sdk/logs/simple_log_processor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <atomic>
#include <mutex>

#include "opentelemetry/common/spin_lock_mutex.h"
#include "opentelemetry/sdk/logs/exporter.h"
#include "opentelemetry/sdk/logs/processor.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace logs
{
/**
* The simple log processor passes all log records
* in a batch of 1 to the configured
* LogExporter.
*
* All calls to the configured LogExporter are synchronized using a
* spin-lock on an atomic_flag.
*/
class SimpleLogProcessor : public LogProcessor
{

public:
explicit SimpleLogProcessor(std::unique_ptr<LogExporter> &&exporter);
virtual ~SimpleLogProcessor() = default;

void OnReceive(std::unique_ptr<opentelemetry::logs::LogRecord> &&record) noexcept override;

bool ForceFlush(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;

bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;

private:
// The configured exporter
std::unique_ptr<LogExporter> exporter_;
// The lock used to ensure the exporter is not called concurrently
opentelemetry::common::SpinLockMutex lock_;
// The atomic boolean flag to ensure the ShutDown() function is only called once
std::atomic_flag shutdown_latch_{ATOMIC_FLAG_INIT};
};
} // namespace logs
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
3 changes: 2 additions & 1 deletion sdk/src/logs/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
add_library(opentelemetry_logs logger_provider.cc logger.cc)
add_library(opentelemetry_logs logger_provider.cc logger.cc
simple_log_processor.cc)

target_link_libraries(opentelemetry_logs opentelemetry_common)
71 changes: 71 additions & 0 deletions sdk/src/logs/simple_log_processor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "opentelemetry/sdk/logs/simple_log_processor.h"

#include <chrono>
#include <vector>

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace logs
{
/**
* Initialize a simple log processor.
* @param exporter the configured exporter where log records are sent
*/
SimpleLogProcessor::SimpleLogProcessor(std::unique_ptr<LogExporter> &&exporter)
: exporter_(std::move(exporter))
{}

/**
* Batches the log record it receives in a batch of 1 and immediately sends it
* to the configured exporter
*/
void SimpleLogProcessor::OnReceive(
std::unique_ptr<opentelemetry::logs::LogRecord> &&record) noexcept
{
nostd::span<std::unique_ptr<opentelemetry::logs::LogRecord>> batch(&record, 1);
// Get lock to ensure Export() is never called concurrently
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);

if (exporter_->Export(batch) != ExportResult::kSuccess)
{
/* Alert user of the failed export */
}
}
/**
* The simple processor does not have any log records to flush so this method is not used
*/
bool SimpleLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept
{
return true;
}

bool SimpleLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept
{
// Should only shutdown exporter ONCE.
if (!shutdown_latch_.test_and_set(std::memory_order_acquire))
{
return exporter_->Shutdown(timeout);
}

return false;
}
} // namespace logs
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
11 changes: 11 additions & 0 deletions sdk/test/logs/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,14 @@ cc_test(
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "simple_log_processor_test",
srcs = [
"simple_log_processor_test.cc",
],
deps = [
"//sdk/src/logs",
"@com_google_googletest//:gtest_main",
],
)
3 changes: 2 additions & 1 deletion sdk/test/logs/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
foreach(testname logger_provider_sdk_test logger_sdk_test)
foreach(testname logger_provider_sdk_test logger_sdk_test
simple_log_processor_test)
add_executable(${testname} "${testname}.cc")
target_link_libraries(${testname} ${GTEST_BOTH_LIBRARIES}
${CMAKE_THREAD_LIBS_INIT} opentelemetry_logs)
Expand Down
10 changes: 8 additions & 2 deletions sdk/test/logs/logger_provider_sdk_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,14 @@ TEST(LoggerProviderSDK, LoggerProviderLoggerArguments)
class DummyProcessor : public LogProcessor
{
void OnReceive(std::unique_ptr<opentelemetry::logs::LogRecord> &&record) noexcept {}
void ForceFlush(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept {}
void Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept {}
bool ForceFlush(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept
{
return true;
}
bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept
{
return true;
}
};

TEST(LoggerProviderSDK, GetAndSetProcessor)
Expand Down
10 changes: 8 additions & 2 deletions sdk/test/logs/logger_sdk_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,14 @@ TEST(LoggerSDK, LogToNullProcessor)
class DummyProcessor : public LogProcessor
{
void OnReceive(std::unique_ptr<opentelemetry::logs::LogRecord> &&record) noexcept {}
void ForceFlush(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept {}
void Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept {}
bool ForceFlush(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept
{
return true;
}
bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept
{
return true;
}
};

TEST(LoggerSDK, LogToAProcessor)
Expand Down
Loading

0 comments on commit daab565

Please sign in to comment.