Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add metric to plugin demo #1106

Merged
merged 14 commits into from
Sep 7, 2023
5 changes: 5 additions & 0 deletions core/models/LogEvent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,9 @@ bool LogEvent::FromJson(const Json::Value& root) {
return true;
}

uint64_t LogEvent::EventsSizeBytes() {
// TODO
return 0;
}

} // namespace logtail
3 changes: 2 additions & 1 deletion core/models/LogEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ class LogEvent : public PipelineEvent {
// for debug and test
Json::Value ToJson() const override;
bool FromJson(const Json::Value&) override;

uint64_t EventsSizeBytes() override;

private:
LogEvent();

Expand Down
5 changes: 5 additions & 0 deletions core/models/MetricEvent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,9 @@ bool MetricEvent::FromJson(const Json::Value& root) {
return true;
}

uint64_t MetricEvent::EventsSizeBytes() {
// TODO
return 0;
}

} // namespace logtail
1 change: 1 addition & 0 deletions core/models/MetricEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class MetricEvent : public PipelineEvent {
// for debug and test
Json::Value ToJson() const override;
bool FromJson(const Json::Value&) override;
uint64_t EventsSizeBytes() override;
private:
MetricEvent();
};
Expand Down
2 changes: 2 additions & 0 deletions core/models/PipelineEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class PipelineEvent {
std::string ToJsonString() const;
bool FromJsonString(const std::string&);

virtual uint64_t EventsSizeBytes() = 0;

protected:
void SetSourceBuffer(std::shared_ptr<SourceBuffer> sourceBuffer) { mSourceBuffer = sourceBuffer; }

Expand Down
5 changes: 5 additions & 0 deletions core/models/PipelineEventGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,9 @@ bool PipelineEventGroup::FromJsonString(const std::string& inJson) {
return FromJson(root);
}

uint64_t PipelineEventGroup::EventGroupSizeBytes() {
// TODO
return 0;
}

} // namespace logtail
1 change: 1 addition & 0 deletions core/models/PipelineEventGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class PipelineEventGroup {
bool FromJson(const Json::Value&);
std::string ToJsonString() const;
bool FromJsonString(const std::string&);
uint64_t EventGroupSizeBytes();

private:
GroupInfo mGroup;
Expand Down
5 changes: 5 additions & 0 deletions core/models/SpanEvent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,9 @@ bool SpanEvent::FromJson(const Json::Value& root) {
return true;
}

uint64_t SpanEvent::EventsSizeBytes() {
// TODO
return 0;
}

} // namespace logtail
2 changes: 1 addition & 1 deletion core/models/SpanEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class SpanEvent : public PipelineEvent {
// for debug and test
Json::Value ToJson() const override;
bool FromJson(const Json::Value&) override;

uint64_t EventsSizeBytes() override;
private:
SpanEvent();
};
Expand Down
15 changes: 15 additions & 0 deletions core/monitor/LogtaiMetric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,21 @@ WriteMetrics::~WriteMetrics() {
Clear();
}

void WriteMetrics::PreparePluginCommonLabels(const std::string& projectName,
const std::string& logstoreName,
const std::string& region,
const std::string& configName,
const std::string& pluginName,
const std::string& pluginID,
MetricLabels& labels) {
labels.emplace_back(std::make_pair("project", projectName));
labels.emplace_back(std::make_pair("logstore", logstoreName));
labels.emplace_back(std::make_pair("region", region));
labels.emplace_back(std::make_pair("config_name", configName));
labels.emplace_back(std::make_pair("plugin_name", pluginName));
labels.emplace_back(std::make_pair("plugin_id", pluginID));
}

void WriteMetrics::PrepareMetricsRecordRef(MetricsRecordRef& ref, MetricLabels&& labels) {
MetricsRecord* cur = new MetricsRecord(std::make_shared<MetricLabels>(labels));
ref.SetMetricsRecord(cur);
Expand Down
7 changes: 7 additions & 0 deletions core/monitor/LogtailMetric.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ class WriteMetrics {
static WriteMetrics* ptr = new WriteMetrics();
return ptr;
}
void PreparePluginCommonLabels(const std::string& projectName,
const std::string& logstoreName,
const std::string& region,
const std::string& configName,
const std::string& pluginName,
const std::string& pluginID,
MetricLabels& labels);
void PrepareMetricsRecordRef(MetricsRecordRef& ref, MetricLabels&& labels);
MetricsRecord* DoSnapshot();

Expand Down
34 changes: 34 additions & 0 deletions core/monitor/MetricConstants.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#include "MetricConstants.h"

namespace logtail {


const std::string METRIC_FIELD_REGION = "region";
const std::string METRIC_REGION_DEFAULT = "default";
const std::string METRIC_SLS_LOGSTORE_NAME = "shennong_log_profile";
const std::string METRIC_TOPIC_TYPE = "logtail_metric";
const std::string METRIC_TOPIC_FIELD_NAME = "__topic__";

const std::string LABEL_PREFIX = "label.";
const std::string VALUE_PREFIX = "value.";


// processor common metrics
const std::string METRIC_PROC_IN_RECORDS_TOTAL = "proc_in_records_total";
const std::string METRIC_PROC_IN_RECORDS_SIZE_BYTES = "proc_in_records_size_bytes";
const std::string METRIC_PROC_OUT_RECORDS_TOTAL = "proc_out_records_total";
const std::string METRIC_PROC_OUT_RECORDS_SIZE_BYTES = "proc_out_records_size_bytes";
const std::string METRIC_PROC_DISCARD_RECORDS_TOTAL = "proc_discard_records_total";
const std::string METRIC_PROC_TIME_MS = "proc_time_ms";

// processor cunstom metrics
const std::string METRIC_PROC_PARSE_IN_SIZE_BYTES = "proc_parse_in_size_bytes";
const std::string METRIC_PROC_PARSE_OUT_SIZE_BYTES = "proc_parse_out_size_bytes";

const std::string METRIC_PROC_PARSE_ERROR_TOTAL = "proc_parse_error_total";
const std::string METRIC_PROC_KEY_COUNT_NOT_MATCH_ERROR_TOTAL = "proc_key_count_not_match_error_total";

// processore plugin name
const std::string PLUGIN_PROCESSOR_PARSE_REGEX_NATIVE = "processor_parse_regex_native";

}
35 changes: 22 additions & 13 deletions core/monitor/MetricConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,27 @@

namespace logtail {

const std::string METRIC_FIELD_REGION = "region";
const std::string METRIC_REGION_DEFAULT = "default";
const std::string METRIC_SLS_LOGSTORE_NAME = "shennong_log_profile";
const std::string METRIC_TOPIC_TYPE = "logtail_metric";
const std::string METRIC_TOPIC_FIELD_NAME = "__topic__";

const std::string LABEL_PREFIX = "label.";
const std::string VALUE_PREFIX = "value.";



const std::string METRIC_FILE_READ_COUNT = "file_read_count";
const std::string METRIC_FILE_READ_BYTES = "file_read_bytes";
extern const std::string METRIC_FIELD_REGION;
extern const std::string METRIC_REGION_DEFAULT;
extern const std::string METRIC_SLS_LOGSTORE_NAME;
extern const std::string METRIC_TOPIC_TYPE;
extern const std::string METRIC_TOPIC_FIELD_NAME;

extern const std::string LABEL_PREFIX;
extern const std::string VALUE_PREFIX;

// processor common metrics
extern const std::string METRIC_PROC_IN_RECORDS_TOTAL;
extern const std::string METRIC_PROC_IN_RECORDS_SIZE_BYTES;
extern const std::string METRIC_PROC_OUT_RECORDS_TOTAL;
extern const std::string METRIC_PROC_OUT_RECORDS_SIZE_BYTES;
extern const std::string METRIC_PROC_DISCARD_RECORDS_TOTAL;
extern const std::string METRIC_PROC_TIME_MS;

// processor custom metrics
extern const std::string METRIC_PROC_PARSE_IN_SIZE_BYTES;
extern const std::string METRIC_PROC_PARSE_OUT_SIZE_BYTES;
extern const std::string METRIC_PROC_PARSE_ERROR_TOTAL;
extern const std::string METRIC_PROC_KEY_COUNT_NOT_MATCH_ERROR_TOTAL;

} // namespace logtail
5 changes: 3 additions & 2 deletions core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,14 @@ void Pipeline::Process(PipelineEventGroup& logGroup) {
}
}

bool Pipeline::InitAndAddProcessor(std::unique_ptr<ProcessorInstance>&& processor, const ComponentConfig& config) {
bool Pipeline::InitAndAddProcessor(std::unique_ptr<ProcessorInstance>&& processor, const PipelineConfig& config) {
if (!processor) {
LOG_ERROR(GetContext().GetLogger(),
("CreateProcessor", ProcessorSplitRegexNative::Name())("Error", "Cannot find plugin"));
return false;
}
if (!processor->Init(config, mContext)) {
ComponentConfig componentConfig(processor->Id(), config);
if (!processor->Init(componentConfig, mContext)) {
LOG_ERROR(GetContext().GetLogger(), ("InitProcessor", processor->Id())("Error", "Init failed"));
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion core/pipeline/Pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class Pipeline {
PipelineContext& GetContext() { return mContext; }

private:
bool InitAndAddProcessor(std::unique_ptr<ProcessorInstance>&& processor, const ComponentConfig& config);
bool InitAndAddProcessor(std::unique_ptr<ProcessorInstance>&& processor, const PipelineConfig& config);

std::string mName;
std::vector<std::unique_ptr<ProcessorInstance> > mProcessorLine;
Expand Down
12 changes: 11 additions & 1 deletion core/pipeline/PipelineConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,17 @@

namespace logtail {

using ComponentConfig = Config; // use Config temporarily
using PipelineConfig = Config; // should use json like object

class ComponentConfig {
public:
ComponentConfig(const std::string& id, const PipelineConfig& config) : mId(id), mConfig(config) {}
const std::string& GetId() const { return mId; }
const Config& GetConfig() const { return mConfig; }

private:
const std::string& mId;
const Config& mConfig; // use Config temporarily
};

} // namespace logtail
21 changes: 20 additions & 1 deletion core/plugin/ProcessorInstance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,38 @@
*/

#include "plugin/ProcessorInstance.h"
#include "monitor/MetricConstants.h"

namespace logtail {

bool ProcessorInstance::Init(const ComponentConfig& config, PipelineContext& context) {
mContext = &context;
mPlugin->SetContext(context);
return mPlugin->Init(config);
bool inited = mPlugin->Init(config);
if (!inited) {
return inited;
}
// should init plugin first, then could GetMetricsRecordRef from plugin
mProcInRecordsTotal = mPlugin->GetMetricsRecordRef().CreateCounter(METRIC_PROC_IN_RECORDS_TOTAL);
mProcOutRecordsTotal = mPlugin->GetMetricsRecordRef().CreateCounter(METRIC_PROC_OUT_RECORDS_TOTAL);
mProcTimeMS = mPlugin->GetMetricsRecordRef().CreateCounter(METRIC_PROC_TIME_MS);

return inited;
}

void ProcessorInstance::Process(PipelineEventGroup& logGroup) {
size_t inSize = logGroup.GetEvents().size();

mProcInRecordsTotal->Add(inSize);
linrunqi08 marked this conversation as resolved.
Show resolved Hide resolved

uint64_t startTime = GetCurrentTimeInMicroSeconds();
mPlugin->Process(logGroup);
uint64_t durationTime = GetCurrentTimeInMicroSeconds() - startTime;

mProcTimeMS->Add(durationTime);

size_t outSize = logGroup.GetEvents().size();
mProcOutRecordsTotal->Add(outSize);
LOG_DEBUG(mContext->GetLogger(), ("Processor", Id())("InSize", inSize)("OutSize", outSize));
}

Expand Down
17 changes: 15 additions & 2 deletions core/plugin/ProcessorInstance.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,36 @@

#include <string>
#include "plugin/PluginInstance.h"
#include "processor/Processor.h"
#include "plugin/PluginCreator.h"
#include "processor/Processor.h"
#include "pipeline/PipelineConfig.h"
#include "pipeline/PipelineContext.h"
#include "monitor/LogtailMetric.h"
#include "monitor/MetricConstants.h"

namespace logtail {


class ProcessorInstance : public PluginInstance {
public:
ProcessorInstance(Processor* plugin, const std::string& pluginId) : PluginInstance(pluginId), mPlugin(plugin) {}

PipelineContext& GetContext() { return *mContext; }
bool Init(const ComponentConfig& config, PipelineContext& context);
void Process(PipelineEventGroup& logGroup);

private:
std::unique_ptr<Processor> mPlugin;
PipelineContext* mContext = nullptr;

CounterPtr mProcInRecordsTotal;
CounterPtr mProcOutRecordsTotal;
// CounterPtr mProcInRecordsSizeBytes;
// CounterPtr mProcOutRecordsSizeBytes;
CounterPtr mProcTimeMS;

#ifdef APSARA_UNIT_TEST_MAIN
friend class ProcessorParseRegexNativeUnittest;
#endif
};

} // namespace logtail
4 changes: 2 additions & 2 deletions core/processor/DynamicCProcessorProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ const char* DynamicCProcessorProxy::Name() const {
return _name.c_str();
}

bool DynamicCProcessorProxy::Init(const ComponentConfig& config) {
return _c_ins->plugin->init(_c_ins, (void*)(&config), (void*)(&GetContext())) == 0;
bool DynamicCProcessorProxy::Init(const ComponentConfig& componentConfig) {
return _c_ins->plugin->init(_c_ins, (void*)(&componentConfig), (void*)(&GetContext())) == 0;
}

void DynamicCProcessorProxy::Process(PipelineEventGroup& logGroup) {
Expand Down
2 changes: 1 addition & 1 deletion core/processor/DynamicCProcessorProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class DynamicCProcessorProxy : public Processor {
DynamicCProcessorProxy(const char* name);
~DynamicCProcessorProxy();
const char* Name() const;
bool Init(const ComponentConfig& config) override;
bool Init(const ComponentConfig& componentConfig) override;
void Process(PipelineEventGroup& logGroup) override;
void SetCProcessor(const processor_interface_t* c_ins);

Expand Down
18 changes: 18 additions & 0 deletions core/processor/Processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,37 @@
#include "models/PipelineEventGroup.h"
#include "pipeline/PipelineConfig.h"
#include "pipeline/PipelineContext.h"
#include "monitor/LogtailMetric.h"


namespace logtail {

class ProcessorInstance;
class Processor {
public:
virtual ~Processor() {}
void SetContext(PipelineContext& context) { mContext = &context; }
PipelineContext& GetContext() { return *mContext; }
MetricsRecordRef GetMetricsRecordRef() { return mMetricsRecordRef; }
virtual bool Init(const ComponentConfig& config) = 0;
virtual void Process(PipelineEventGroup& logGroup) = 0;

protected:
virtual bool IsSupportedEvent(const PipelineEventPtr& e) = 0;
PipelineContext* mContext = nullptr;
MetricsRecordRef mMetricsRecordRef;

void SetMetricsRecordRef(std::string name, std::string id) {
std::vector<std::pair<std::string, std::string>> labels;
WriteMetrics::GetInstance()->PreparePluginCommonLabels(GetContext().GetProjectName(),
GetContext().GetLogstoreName(),
GetContext().GetRegion(),
GetContext().GetConfigName(),
name,
id,
labels);

WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mMetricsRecordRef, std::move(labels));
}
};
} // namespace logtail
Loading
Loading