Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimization of the self-monitoring metrics architecture for Go (Golang) plugin modules and unified output of metrics with C++ modules. #1290

Merged
merged 42 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
0daec5e
add metric code
linrunqi08 Dec 27, 2023
9847679
add cpp call go
linrunqi08 Dec 27, 2023
30810be
update call func
linrunqi08 Dec 27, 2023
b8559c3
tmp debug
linrunqi08 Dec 27, 2023
f26e1a8
tmp save
linrunqi08 Dec 28, 2023
e436d75
tmp save
linrunqi08 Dec 28, 2023
7c38780
add flusher
linrunqi08 Dec 28, 2023
33ea6a8
add flusher
linrunqi08 Dec 28, 2023
9617fb7
fix cpp interface
linrunqi08 Dec 29, 2023
1c7da0f
refine code
linrunqi08 Dec 29, 2023
1ad21ef
refine code
linrunqi08 Dec 29, 2023
465b54c
refine code
linrunqi08 Dec 29, 2023
1ec3dd3
fix comments
linrunqi08 Jan 2, 2024
c1d90cf
fix comment
linrunqi08 Jan 2, 2024
d8c2263
fix comment
linrunqi08 Jan 2, 2024
77ffaaa
Merge branch '1.8' of https://github.com/alibaba/ilogtail into featur…
linrunqi08 Jan 2, 2024
de51063
fix comments
linrunqi08 Jan 9, 2024
e19a9bd
fix build
linrunqi08 Jan 9, 2024
09136c2
add node id
linrunqi08 Jan 12, 2024
9ba425d
add child plugin id
linrunqi08 Jan 12, 2024
e104daa
fix metric
linrunqi08 Jan 15, 2024
96e8296
fix unittest
linrunqi08 Jan 15, 2024
2107d4f
fix go lint
linrunqi08 Jan 15, 2024
3b351c8
fix go lint
linrunqi08 Jan 15, 2024
75c0f4f
Merge branch '1.8' of https://github.com/alibaba/ilogtail into featur…
linrunqi08 Jan 15, 2024
9bb08c6
merge 1.8
linrunqi08 Jan 15, 2024
81481bd
fix ut
linrunqi08 Jan 15, 2024
d70ad57
try fix e2e
linrunqi08 Jan 15, 2024
2ebca2f
fix comments
linrunqi08 Jan 15, 2024
b12a775
fix comments
linrunqi08 Jan 15, 2024
20480fe
Merge branch '1.8' of https://github.com/alibaba/ilogtail into featur…
linrunqi08 Jan 16, 2024
d18453e
add comments
linrunqi08 Jan 22, 2024
0a2d671
fix comments
linrunqi08 Jan 22, 2024
192337b
fix comments
linrunqi08 Jan 22, 2024
3d7964b
fix comments
linrunqi08 Jan 22, 2024
2b401c1
fix comments
linrunqi08 Jan 22, 2024
c4ecf69
fix comments
linrunqi08 Jan 22, 2024
a46c523
fix lint
linrunqi08 Jan 22, 2024
d2aa7d3
fix ut
linrunqi08 Jan 22, 2024
e216f7e
fix c++ pluginID
linrunqi08 Jan 22, 2024
81e2942
fix comments
linrunqi08 Jan 23, 2024
fa9eea1
fix ut
linrunqi08 Jan 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions core/go_pipeline/LogtailPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,12 @@ bool LogtailPlugin::LoadPluginBase() {
return mPluginValid;
}

mGetPipelineMetricsFun = (GetPipelineMetricsFun)loader.LoadMethod("GetPipelineMetrics", error);
linrunqi08 marked this conversation as resolved.
Show resolved Hide resolved
if (!error.empty()) {
LOG_ERROR(sLogger, ("load GetPipelineMetrics error, Message", error));
return mPluginValid;
}

mPluginBasePtr = loader.Release();
}

Expand Down Expand Up @@ -517,6 +523,48 @@ void LogtailPlugin::ProcessLogGroup(const std::string& configName,
}
}

void LogtailPlugin::GetPipelineMetrics(std::vector<std::map<std::string, std::string>>& metircsList) {
if (mGetPipelineMetricsFun != nullptr) {
auto metrics = mGetPipelineMetricsFun();
if (metrics != nullptr) {
for (int i = 0; i < metrics->count; ++i) {
std::map<std::string, std::string> item;
InnerPluginMetric* innerpm = metrics->metrics[i];
if (innerpm != nullptr) {
for (int j = 0; j < innerpm->count; ++j) {
InnerKeyValue* innerkv = innerpm->keyValues[j];
if (innerkv != nullptr) {
item.insert(std::make_pair(std::string(innerkv->key), std::string(innerkv->value)));
}
}
}
metircsList.emplace_back(item);
}
if (metrics->count > 0) {
linrunqi08 marked this conversation as resolved.
Show resolved Hide resolved
for (int i = 0; i < metrics->count; ++i) {
InnerPluginMetric* innerpm = metrics->metrics[i];
linrunqi08 marked this conversation as resolved.
Show resolved Hide resolved
if (innerpm != nullptr) {
if (innerpm->count > 0) {
for (int j = 0; j < innerpm->count; ++j) {
InnerKeyValue* innerkv = innerpm->keyValues[j];
if (innerkv != nullptr) {
free(innerkv->key);
free(innerkv->value);
free(innerkv);
}
}
free(innerpm->keyValues);
}
free(innerpm);
}
}
free(metrics->metrics);
}
free(metrics);
}
}
}

K8sContainerMeta LogtailPlugin::GetContainerMeta(const string& containerID) {
if (mPluginValid && mGetContainerMetaFun != nullptr) {
GoString id;
Expand Down
22 changes: 21 additions & 1 deletion core/go_pipeline/LogtailPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,23 @@ struct innerContainerMeta {
char** envsKey;
char** envsVal;
};

typedef struct {
char* key;
char* value;
} InnerKeyValue;

typedef struct {
InnerKeyValue** keyValues;
int count;
} InnerPluginMetric;

typedef struct {
InnerPluginMetric** metrics;
int count;
} InnerPluginMetrics;


struct K8sContainerMeta {
std::string PodName;
std::string K8sNamespace;
Expand Down Expand Up @@ -122,6 +139,7 @@ typedef GoInt (*InitPluginBaseV2Fun)(GoString cfg);
typedef GoInt (*ProcessLogsFun)(GoString c, GoSlice l, GoString p, GoString t, GoSlice tags);
typedef GoInt (*ProcessLogGroupFun)(GoString c, GoSlice l, GoString p);
typedef struct innerContainerMeta* (*GetContainerMetaFun)(GoString containerID);
typedef InnerPluginMetrics* (*GetPipelineMetricsFun)();

// Methods export by adapter.
typedef int (*IsValidToSendFun)(long long logstoreKey);
Expand Down Expand Up @@ -234,6 +252,8 @@ class LogtailPlugin {

K8sContainerMeta GetContainerMeta(const std::string& containerID);

void GetPipelineMetrics(std::vector<std::map<std::string, std::string>>& metircsList);

private:
void* mPluginBasePtr;
void* mPluginAdapterPtr;
Expand All @@ -252,7 +272,7 @@ class LogtailPlugin {
ProcessLogsFun mProcessLogsFun;
ProcessLogGroupFun mProcessLogGroupFun;
GetContainerMetaFun mGetContainerMetaFun;

GetPipelineMetricsFun mGetPipelineMetricsFun;
// Configuration for plugin system in JSON format.
Json::Value mPluginCfg;

Expand Down
3 changes: 2 additions & 1 deletion core/monitor/LogtaiMetric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,15 @@ void WriteMetrics::PreparePluginCommonLabels(const std::string& projectName,
const std::string& configName,
const std::string& pluginName,
const std::string& pluginID,
const std::string& childPluginID,
linrunqi08 marked this conversation as resolved.
Show resolved Hide resolved
MetricLabels& labels) {
labels.emplace_back(std::make_pair("project", projectName));
labels.emplace_back(std::make_pair("logstore", logstoreName));
labels.emplace_back(std::make_pair("region", region));
labels.emplace_back(std::make_pair("config_name", configName));
labels.emplace_back(std::make_pair("plugin_name", pluginName));
labels.emplace_back(std::make_pair("plugin_id", pluginID));
labels.emplace_back(std::make_pair("child_plugin_id", childPluginID));
}

void WriteMetrics::PrepareMetricsRecordRef(MetricsRecordRef& ref, MetricLabels&& labels) {
Expand Down Expand Up @@ -255,7 +257,6 @@ void ReadMetrics::ReadAsLogGroup(std::map<std::string, sls_logs::LogGroup*>& log
MetricsRecord* tmp = mHead;
while (tmp) {
Log* logPtr = nullptr;
// for (auto &item: tmp->GetLabels()) {
for (auto item = tmp->GetLabels()->begin(); item != tmp->GetLabels()->end(); ++item) {
std::pair<std::string, std::string> pair = *item;
if (METRIC_FIELD_REGION == pair.first) {
Expand Down
1 change: 1 addition & 0 deletions core/monitor/LogtailMetric.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class WriteMetrics {
const std::string& configName,
const std::string& pluginName,
const std::string& pluginID,
const std::string& childPluginID,
MetricLabels& labels);
void PrepareMetricsRecordRef(MetricsRecordRef& ref, MetricLabels&& labels);
MetricsRecord* DoSnapshot();
Expand Down
72 changes: 64 additions & 8 deletions core/monitor/MetricExportor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include "config_manager/ConfigManager.h"
#include "LogFileProfiler.h"
#include "MetricConstants.h"
#include "go_pipeline/LogtailPlugin.h"
#include "app_config/AppConfig.h"

using namespace sls_logs;
using namespace std;
Expand All @@ -13,16 +15,53 @@ namespace logtail {

MetricExportor::MetricExportor() : mSendInterval(60), mLastSendTime(time(NULL) - (rand() % (mSendInterval / 10)) * 10) {}

void MetricExportor::PushMetrics(bool forceSend) {
int32_t curTime = time(NULL);
if (!forceSend && (curTime - mLastSendTime < mSendInterval)) {
return;
void MetricExportor::PushGoPluginMetrics() {
std::vector<std::map<std::string, std::string>> goPluginMetircsList;
LogtailPlugin::GetInstance()->GetPipelineMetrics(goPluginMetircsList);
std::map<std::string, sls_logs::LogGroup*> goLogGroupMap;

for (auto& item : goPluginMetircsList) {
std::string configName = "";
std::string region = METRIC_REGION_DEFAULT;
linrunqi08 marked this conversation as resolved.
Show resolved Hide resolved
{
// get the config_name label
for (const auto& pair : item) {
if (pair.first == "label.config_name") {
configName = pair.second;
break;
}
}
if (!configName.empty()) {
// get region info by config_name
Config* config = ConfigManager::GetInstance()->FindConfigByName(configName);
if (config) {
region = config->mRegion;
}
}
}
Log* logPtr = nullptr;
auto LogGroupIter = goLogGroupMap.find(region);
if (LogGroupIter != goLogGroupMap.end()) {
sls_logs::LogGroup* logGroup = LogGroupIter->second;
logPtr = logGroup->add_logs();
} else {
sls_logs::LogGroup* logGroup = new sls_logs::LogGroup();
logPtr = logGroup->add_logs();
goLogGroupMap.insert(std::pair<std::string, sls_logs::LogGroup*>(region, logGroup));
}
auto now = GetCurrentLogtailTime();
SetLogTime(logPtr,
linrunqi08 marked this conversation as resolved.
Show resolved Hide resolved
AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta() : now.tv_sec);
for (const auto& pair : item) {
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key(pair.first);
contentPtr->set_value(pair.second);
}
}
ReadMetrics::GetInstance()->UpdateMetrics();

std::map<std::string, sls_logs::LogGroup*> logGroupMap;
ReadMetrics::GetInstance()->ReadAsLogGroup(logGroupMap);
SendMetrics(goLogGroupMap);
}

void MetricExportor::SendMetrics(std::map<std::string, sls_logs::LogGroup*>& logGroupMap) {
std::map<std::string, sls_logs::LogGroup*>::iterator iter;
for (iter = logGroupMap.begin(); iter != logGroupMap.end(); iter ++) {
sls_logs::LogGroup* logGroup = iter->second;
Expand All @@ -37,4 +76,21 @@ void MetricExportor::PushMetrics(bool forceSend) {
delete logGroup;
}
}

void MetricExportor::PushMetrics(bool forceSend) {
int32_t curTime = time(NULL);
if (!forceSend && (curTime - mLastSendTime < mSendInterval)) {
return;
}

if (LogtailPlugin::GetInstance()->IsPluginOpened()) {
PushGoPluginMetrics();
}

ReadMetrics::GetInstance()->UpdateMetrics();
std::map<std::string, sls_logs::LogGroup*> logGroupMap;
ReadMetrics::GetInstance()->ReadAsLogGroup(logGroupMap);

SendMetrics(logGroupMap);
}
}
2 changes: 2 additions & 0 deletions core/monitor/MetricExportor.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class MetricExportor {
return ptr;
}
void PushMetrics(bool forceSend);
void PushGoPluginMetrics();
void SendMetrics(std::map<std::string, sls_logs::LogGroup*>& logGroupMap);

private:
MetricExportor();
Expand Down
48 changes: 36 additions & 12 deletions core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,21 @@
#include "processor/ProcessorDesensitizeNative.h"
#include "processor/ProcessorTagNative.h"
#include "processor/ProcessorFilterNative.h"
#include "plugin/instance/PluginInstance.h"


namespace logtail {

void genPluginAndNodeID(int& pluginIndex, bool lastOne, PluginInstance::PluginMeta& pluginMeta) {
pluginIndex ++;
int childPluginID = pluginIndex;
linrunqi08 marked this conversation as resolved.
Show resolved Hide resolved
if (!lastOne) {
childPluginID = pluginIndex + 1;
}
pluginMeta.pluginID = std::to_string(pluginIndex);
pluginMeta.childPluginID = std::to_string(childPluginID);
}

bool Pipeline::Init(const PipelineConfig& config) {
mName = config.mConfigName;
mConfig = config;
Expand All @@ -41,15 +53,18 @@ bool Pipeline::Init(const PipelineConfig& config) {
mContext.SetRegion(config.mRegion);

int pluginIndex = 0;

PluginInstance::PluginMeta pluginMeta;
// Input plugin
pluginIndex++;

if (config.mLogType == STREAM_LOG || config.mLogType == PLUGIN_LOG) {
return true;
}

genPluginAndNodeID(pluginIndex, false, pluginMeta);
std::unique_ptr<ProcessorInstance> pluginGroupInfo = PluginRegistry::GetInstance()->CreateProcessor(
ProcessorTagNative::sName, std::to_string(pluginIndex++));
ProcessorTagNative::sName, pluginMeta);
if (!InitAndAddProcessor(std::move(pluginGroupInfo), config)) {
return false;
}
Expand All @@ -59,35 +74,37 @@ bool Pipeline::Init(const PipelineConfig& config) {
}

std::unique_ptr<ProcessorInstance> pluginDecoder;
genPluginAndNodeID(pluginIndex, false, pluginMeta);
if (config.mLogType == JSON_LOG || !config.IsMultiline()) {
pluginDecoder = PluginRegistry::GetInstance()->CreateProcessor(ProcessorSplitLogStringNative::sName,
std::to_string(pluginIndex++));
pluginMeta);
} else {
pluginDecoder = PluginRegistry::GetInstance()->CreateProcessor(ProcessorSplitRegexNative::sName,
std::to_string(pluginIndex++));
pluginMeta);
}
if (!InitAndAddProcessor(std::move(pluginDecoder), config)) {
return false;
}

// APSARA_LOG, REGEX_LOG, STREAM_LOG, JSON_LOG, DELIMITER_LOG, PLUGIN_LOG
std::unique_ptr<ProcessorInstance> pluginParser;
genPluginAndNodeID(pluginIndex, false, pluginMeta);
switch (config.mLogType) {
case APSARA_LOG:
pluginParser = PluginRegistry::GetInstance()->CreateProcessor(ProcessorParseApsaraNative::sName,
std::to_string(pluginIndex++));
pluginMeta);
break;
case REGEX_LOG:
pluginParser = PluginRegistry::GetInstance()->CreateProcessor(ProcessorParseRegexNative::sName,
std::to_string(pluginIndex++));
pluginMeta);
break;
case JSON_LOG:
pluginParser = PluginRegistry::GetInstance()->CreateProcessor(ProcessorParseJsonNative::sName,
std::to_string(pluginIndex++));
pluginMeta);
break;
case DELIMITER_LOG:
pluginParser = PluginRegistry::GetInstance()->CreateProcessor(ProcessorParseDelimiterNative::sName,
std::to_string(pluginIndex++));
pluginMeta);
break;
default:
return false;
Expand All @@ -96,21 +113,28 @@ bool Pipeline::Init(const PipelineConfig& config) {
return false;
}

genPluginAndNodeID(pluginIndex, false, pluginMeta);
std::unique_ptr<ProcessorInstance> pluginTime = PluginRegistry::GetInstance()->CreateProcessor(
ProcessorParseTimestampNative::sName, std::to_string(pluginIndex++));
ProcessorParseTimestampNative::sName, pluginMeta);
if (!InitAndAddProcessor(std::move(pluginTime), config)) {
return false;
}

if (!config.mSensitiveWordCastOptions.empty()) {
genPluginAndNodeID(pluginIndex, false, pluginMeta);
} else {
genPluginAndNodeID(pluginIndex, true, pluginMeta);
}
std::unique_ptr<ProcessorInstance> pluginFilter = PluginRegistry::GetInstance()->CreateProcessor(
ProcessorFilterNative::sName, std::to_string(pluginIndex++));
ProcessorFilterNative::sName, pluginMeta);
if (!InitAndAddProcessor(std::move(pluginFilter), config)) {
return false;
}

if (!config.mSensitiveWordCastOptions.empty()) {
genPluginAndNodeID(pluginIndex, true, pluginMeta);
linrunqi08 marked this conversation as resolved.
Show resolved Hide resolved
std::unique_ptr<ProcessorInstance> pluginDesensitize = PluginRegistry::GetInstance()->CreateProcessor(
ProcessorDesensitizeNative::sName, std::to_string(pluginIndex++));
ProcessorDesensitizeNative::sName, pluginMeta);
if (!InitAndAddProcessor(std::move(pluginDesensitize), config)) {
return false;
}
Expand All @@ -131,9 +155,9 @@ bool Pipeline::InitAndAddProcessor(std::unique_ptr<ProcessorInstance>&& processo
("CreateProcessor", ProcessorSplitRegexNative::sName)("Error", "Cannot find plugin"));
return false;
}
ComponentConfig componentConfig(processor->Id(), config);
ComponentConfig componentConfig(processor->Meta().pluginID, config);
if (!processor->Init(componentConfig, mContext)) {
LOG_ERROR(GetContext().GetLogger(), ("InitProcessor", processor->Id())("Error", "Init failed"));
LOG_ERROR(GetContext().GetLogger(), ("InitProcessor", processor->Meta().pluginID)("Error", "Init failed"));
return false;
}
mProcessorLine.emplace_back(std::move(processor));
Expand Down
8 changes: 4 additions & 4 deletions core/plugin/PluginRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,17 @@ void PluginRegistry::UnloadPlugins() {
}

std::unique_ptr<ProcessorInstance> PluginRegistry::CreateProcessor(const std::string& name,
const std::string& pluginId) {
const PluginInstance::PluginMeta& pluginMeta) {
return std::unique_ptr<ProcessorInstance>(
static_cast<ProcessorInstance*>(Create(PROCESSOR_PLUGIN, name, pluginId).release()));
static_cast<ProcessorInstance*>(Create(PROCESSOR_PLUGIN, name, pluginMeta).release()));
}

std::unique_ptr<PluginInstance>
PluginRegistry::Create(PluginCat cat, const std::string& name, const std::string& pluginId) {
PluginRegistry::Create(PluginCat cat, const std::string& name, const PluginInstance::PluginMeta& pluginMeta) {
std::unique_ptr<PluginInstance> ins;
auto creatorEntry = mPluginDict.find(PluginKey(cat, name));
if (creatorEntry != mPluginDict.end()) {
ins = creatorEntry->second->Create(pluginId);
ins = creatorEntry->second->Create(pluginMeta);
}
return ins;
}
Expand Down
Loading
Loading