Skip to content

Commit

Permalink
Add Asynchronous Metric Instruments SDK (open-telemetry#191)
Browse files Browse the repository at this point in the history
  • Loading branch information
ankit-bhargava authored Aug 5, 2020
1 parent c47e443 commit c344737
Show file tree
Hide file tree
Showing 4 changed files with 533 additions and 32 deletions.
272 changes: 272 additions & 0 deletions sdk/include/opentelemetry/sdk/metrics/async_instruments.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
#pragma once

#include <map>
#include <memory>
#include <sstream>
#include <stdexcept>
#include <vector>
#include "opentelemetry/metrics/async_instruments.h"
#include "opentelemetry/sdk/metrics/aggregator/counter_aggregator.h"
#include "opentelemetry/sdk/metrics/aggregator/min_max_sum_count_aggregator.h"
#include "opentelemetry/sdk/metrics/instrument.h"
#include "opentelemetry/version.h"

namespace metrics_api = opentelemetry::metrics;

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace metrics
{

template <class T>
class ValueObserver : public AsynchronousInstrument<T>, virtual public metrics_api::ValueObserver<T>
{

public:
ValueObserver() = default;

ValueObserver(nostd::string_view name,
nostd::string_view description,
nostd::string_view unit,
bool enabled,
void (*callback)(metrics_api::ObserverResult<T>))
: AsynchronousInstrument<T>(name,
description,
unit,
enabled,
callback,
metrics_api::InstrumentKind::ValueObserver)
{}

/*
* Updates the instruments aggregator with the new value. The labels should
* contain the keys and values to be associated with this value.
*
* @param value is the numerical representation of the metric being captured
* @param labels the set of labels, as key-value pairs
*/
virtual void observe(T value, const trace::KeyValueIterable &labels) override
{
this->mu_.lock();
std::string labelset = KvToString(labels);
if (boundAggregators_.find(labelset) == boundAggregators_.end())
{
auto sp1 = std::shared_ptr<Aggregator<T>>(new MinMaxSumCountAggregator<T>(this->kind_));
boundAggregators_.insert(std::make_pair(labelset, sp1));
sp1->update(value);
}
else
{
boundAggregators_[labelset]->update(value);
}
this->mu_.unlock();
}

/*
* Activate the instrument's callback function to record a measurement. This
* function will be called by the specified controller at a regular interval.
*
* @param none
* @return none
*/
virtual void run() override
{
metrics_api::ObserverResult<T> res(this);
this->callback_(res);
}

virtual std::vector<Record> GetRecords() override
{
this->mu_.lock();
std::vector<Record> ret;
for (auto x : boundAggregators_)
{
x.second->checkpoint();
ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second));
}
boundAggregators_.clear();
this->mu_.unlock();
return ret;
}

// Public mapping from labels (stored as strings) to their respective aggregators
std::unordered_map<std::string, std::shared_ptr<Aggregator<T>>> boundAggregators_;
};

template <class T>
class SumObserver : public AsynchronousInstrument<T>, virtual public metrics_api::SumObserver<T>
{

public:
SumObserver() = default;

SumObserver(nostd::string_view name,
nostd::string_view description,
nostd::string_view unit,
bool enabled,
void (*callback)(metrics_api::ObserverResult<T>))
: AsynchronousInstrument<T>(name,
description,
unit,
enabled,
callback,
metrics_api::InstrumentKind::SumObserver)
{}

/*
* Updates the instruments aggregator with the new value. The labels should
* contain the keys and values to be associated with this value.
*
* @param value is the numerical representation of the metric being captured
* @param labels the set of labels, as key-value pairs
*/
virtual void observe(T value, const trace::KeyValueIterable &labels) override
{
this->mu_.lock();
std::string labelset = KvToString(labels);
if (boundAggregators_.find(labelset) == boundAggregators_.end())
{
auto sp1 = std::shared_ptr<Aggregator<T>>(new CounterAggregator<T>(this->kind_));
boundAggregators_.insert(std::make_pair(labelset, sp1));
if (value < 0)
{
#if __EXCEPTIONS
throw std::invalid_argument("Counter instrument updates must be non-negative.");
#else
std::terminate();
#endif
}
else
{
sp1->update(value);
}
}
else
{
if (value < 0)
{
#if __EXCEPTIONS
throw std::invalid_argument("Counter instrument updates must be non-negative.");
#else
std::terminate();
#endif
}
else
{
boundAggregators_[labelset]->update(value);
}
}
this->mu_.unlock();
}

/*
* Activate the intsrument's callback function to record a measurement. This
* function will be called by the specified controller at a regular interval.
*
* @param none
* @return none
*/
virtual void run() override
{
metrics_api::ObserverResult<T> res(this);
this->callback_(res);
}

virtual std::vector<Record> GetRecords() override
{
this->mu_.lock();
std::vector<Record> ret;
for (auto x : boundAggregators_)
{
x.second->checkpoint();
ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second));
}
boundAggregators_.clear();
this->mu_.unlock();
return ret;
}

// Public mapping from labels (stored as strings) to their respective aggregators
std::unordered_map<std::string, std::shared_ptr<Aggregator<T>>> boundAggregators_;
};

template <class T>
class UpDownSumObserver : public AsynchronousInstrument<T>,
virtual public metrics_api::UpDownSumObserver<T>
{

public:
UpDownSumObserver() = default;

UpDownSumObserver(nostd::string_view name,
nostd::string_view description,
nostd::string_view unit,
bool enabled,
void (*callback)(metrics_api::ObserverResult<T>))
: AsynchronousInstrument<T>(name,
description,
unit,
enabled,
callback,
metrics_api::InstrumentKind::UpDownSumObserver)
{}

/*
* Updates the instruments aggregator with the new value. The labels should
* contain the keys and values to be associated with this value.
*
* @param value is the numerical representation of the metric being captured
* @param labels the set of labels, as key-value pairs
*/
virtual void observe(T value, const trace::KeyValueIterable &labels) override
{
this->mu_.lock();
std::string labelset = KvToString(labels);
if (boundAggregators_.find(labelset) == boundAggregators_.end())
{
auto sp1 = std::shared_ptr<Aggregator<T>>(new CounterAggregator<T>(this->kind_));
boundAggregators_.insert(std::make_pair(labelset, sp1));
sp1->update(value);
}
else
{
boundAggregators_[labelset]->update(value);
}
this->mu_.unlock();
}

/*
* Activate the intsrument's callback function to record a measurement. This
* function will be called by the specified controller at a regular interval.
*
* @param none
* @return none
*/
virtual void run() override
{
metrics_api::ObserverResult<T> res(this);
this->callback_(res);
}

virtual std::vector<Record> GetRecords() override
{
this->mu_.lock();
std::vector<Record> ret;
for (auto x : boundAggregators_)
{
x.second->checkpoint();
ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second));
}
boundAggregators_.clear();
this->mu_.unlock();
return ret;
}

// Public mapping from labels (stored as strings) to their respective aggregators
std::unordered_map<std::string, std::shared_ptr<Aggregator<T>>> boundAggregators_;
};

} // namespace metrics
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
67 changes: 60 additions & 7 deletions sdk/include/opentelemetry/sdk/metrics/instrument.h
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
#pragma once

#include "opentelemetry/metrics/instrument.h"
#include "opentelemetry/sdk/metrics/aggregator/aggregator.h"
#include "opentelemetry/sdk/metrics/record.h"
#include "opentelemetry/version.h"

#include <iostream>
#include <map>
#include <memory>
#include <sstream>
#include <string>
#include <unordered_map>
#include <vector>
#include "opentelemetry/metrics/instrument.h"
#include "opentelemetry/sdk/metrics/aggregator/aggregator.h"
#include "opentelemetry/sdk/metrics/record.h"
#include "opentelemetry/version.h"

namespace metrics_api = opentelemetry::metrics;
namespace trace_api = opentelemetry::trace;
Expand Down Expand Up @@ -85,15 +84,25 @@ class BoundSynchronousInstrument : public Instrument,
* @param none
* @return void
*/
virtual void unbind() override { ref_ -= 1; }
virtual void unbind() override
{
this->mu_.lock();
ref_ -= 1;
this->mu_.unlock();
}

/**
* Increments the reference count. This function is used when binding or instantiating.
*
* @param none
* @return void
*/
virtual void inc_ref() override { ref_ += 1; }
virtual void inc_ref() override
{
this->mu_.lock();
ref_ += 1;
this->mu_.unlock();
}

/**
* Returns the current reference count of the instrument. This value is used to
Expand Down Expand Up @@ -164,6 +173,7 @@ class SynchronousInstrument : public Instrument,
return nostd::shared_ptr<BoundSynchronousInstrument<T>>();
}

// This function is necessary for batch recording and should NOT be called by the user
virtual void update(T value, const trace::KeyValueIterable &labels) override = 0;

/**
Expand All @@ -177,6 +187,49 @@ class SynchronousInstrument : public Instrument,
virtual std::vector<Record> GetRecords() = 0;
};

template <class T>
class AsynchronousInstrument : public Instrument,
virtual public metrics_api::AsynchronousInstrument<T>
{

public:
AsynchronousInstrument() = default;

AsynchronousInstrument(nostd::string_view name,
nostd::string_view description,
nostd::string_view unit,
bool enabled,
void (*callback)(metrics_api::ObserverResult<T>),
metrics_api::InstrumentKind kind)
: Instrument(name, description, unit, enabled, kind)
{
this->callback_ = callback;
}

/**
* Captures data through a manual call rather than the automatic collection process instituted
* in the run function. Asynchronous instruments are generally expected to obtain data from
* their callbacks rather than direct calls. This function is used by the callback to store data.
*
* @param value is the numerical representation of the metric being captured
* @param labels is the numerical representation of the metric being captured
* @return none
*/
virtual void observe(T value, const trace::KeyValueIterable &labels) override = 0;

virtual std::vector<Record> GetRecords() = 0;

/**
* Captures data by activating the callback function associated with the
* instrument and storing its return value. Callbacks for asynchronous
* instruments are defined during construction.
*
* @param none
* @return none
*/
virtual void run() override = 0;
};

// Helper functions for turning a trace::KeyValueIterable into a string
inline void print_value(std::stringstream &ss,
common::AttributeValue &value,
Expand Down
Loading

0 comments on commit c344737

Please sign in to comment.