Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimization of the self-monitoring metrics architecture for Go (Golang) plugin modules and unified output of metrics with C++ modules. #1290

Merged
merged 42 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
0daec5e
add metric code
linrunqi08 Dec 27, 2023
9847679
add cpp call go
linrunqi08 Dec 27, 2023
30810be
update call func
linrunqi08 Dec 27, 2023
b8559c3
tmp debug
linrunqi08 Dec 27, 2023
f26e1a8
tmp save
linrunqi08 Dec 28, 2023
e436d75
tmp save
linrunqi08 Dec 28, 2023
7c38780
add flusher
linrunqi08 Dec 28, 2023
33ea6a8
add flusher
linrunqi08 Dec 28, 2023
9617fb7
fix cpp interface
linrunqi08 Dec 29, 2023
1c7da0f
refine code
linrunqi08 Dec 29, 2023
1ad21ef
refine code
linrunqi08 Dec 29, 2023
465b54c
refine code
linrunqi08 Dec 29, 2023
1ec3dd3
fix comments
linrunqi08 Jan 2, 2024
c1d90cf
fix comment
linrunqi08 Jan 2, 2024
d8c2263
fix comment
linrunqi08 Jan 2, 2024
77ffaaa
Merge branch '1.8' of https://github.com/alibaba/ilogtail into featur…
linrunqi08 Jan 2, 2024
de51063
fix comments
linrunqi08 Jan 9, 2024
e19a9bd
fix build
linrunqi08 Jan 9, 2024
09136c2
add node id
linrunqi08 Jan 12, 2024
9ba425d
add child plugin id
linrunqi08 Jan 12, 2024
e104daa
fix metric
linrunqi08 Jan 15, 2024
96e8296
fix unittest
linrunqi08 Jan 15, 2024
2107d4f
fix go lint
linrunqi08 Jan 15, 2024
3b351c8
fix go lint
linrunqi08 Jan 15, 2024
75c0f4f
Merge branch '1.8' of https://github.com/alibaba/ilogtail into featur…
linrunqi08 Jan 15, 2024
9bb08c6
merge 1.8
linrunqi08 Jan 15, 2024
81481bd
fix ut
linrunqi08 Jan 15, 2024
d70ad57
try fix e2e
linrunqi08 Jan 15, 2024
2ebca2f
fix comments
linrunqi08 Jan 15, 2024
b12a775
fix comments
linrunqi08 Jan 15, 2024
20480fe
Merge branch '1.8' of https://github.com/alibaba/ilogtail into featur…
linrunqi08 Jan 16, 2024
d18453e
add comments
linrunqi08 Jan 22, 2024
0a2d671
fix comments
linrunqi08 Jan 22, 2024
192337b
fix comments
linrunqi08 Jan 22, 2024
3d7964b
fix comments
linrunqi08 Jan 22, 2024
2b401c1
fix comments
linrunqi08 Jan 22, 2024
c4ecf69
fix comments
linrunqi08 Jan 22, 2024
a46c523
fix lint
linrunqi08 Jan 22, 2024
d2aa7d3
fix ut
linrunqi08 Jan 22, 2024
e216f7e
fix c++ pluginID
linrunqi08 Jan 22, 2024
81e2942
fix comments
linrunqi08 Jan 23, 2024
fa9eea1
fix ut
linrunqi08 Jan 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion core/go_pipeline/LogtailPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,31 +388,37 @@ bool LogtailPlugin::LoadPluginBase() {
return mPluginValid;
}
}
// 加载全局配置,目前应该没有调用点
mLoadGlobalConfigFun = (LoadGlobalConfigFun)loader.LoadMethod("LoadGlobalConfig", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load LoadGlobalConfig error, Message", error));
return mPluginValid;
}
// 加载单个配置,目前应该是Resume的时候,全量加载一次
mLoadConfigFun = (LoadConfigFun)loader.LoadMethod("LoadConfig", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load LoadConfig error, Message", error));
return mPluginValid;
}
// 更新配置,目前应该没有调用点
mUnloadConfigFun = (UnloadConfigFun)loader.LoadMethod("UnloadConfig", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load UnloadConfig error, Message", error));
return mPluginValid;
}
// 插件暂停
mHoldOnFun = (HoldOnFun)loader.LoadMethod("HoldOn", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load HoldOn error, Message", error));
return mPluginValid;
}
// 插件恢复
mResumeFun = (ResumeFun)loader.LoadMethod("Resume", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load Resume error, Message", error));
return mPluginValid;
}
// C++传递原始二进制数据到golang插件,v1和v2的区别:是否传递tag
mProcessRawLogFun = (ProcessRawLogFun)loader.LoadMethod("ProcessRawLog", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load ProcessRawLog error, Message", error));
Expand All @@ -423,22 +429,30 @@ bool LogtailPlugin::LoadPluginBase() {
LOG_ERROR(sLogger, ("load ProcessRawLogV2 error, Message", error));
return mPluginValid;
}

// C++获取容器信息的
mGetContainerMetaFun = (GetContainerMetaFun)loader.LoadMethod("GetContainerMeta", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load GetContainerMeta error, Message", error));
return mPluginValid;
}
// C++传递单条数据到golang插件
mProcessLogsFun = (ProcessLogsFun)loader.LoadMethod("ProcessLog", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load ProcessLogs error, Message", error));
return mPluginValid;
}
// C++传递数据到golang插件
mProcessLogGroupFun = (ProcessLogGroupFun)loader.LoadMethod("ProcessLogGroup", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load ProcessLogGroup error, Message", error));
return mPluginValid;
}
// 获取golang插件部分统计信息
mGetPipelineMetricsFun = (GetPipelineMetricsFun)loader.LoadMethod("GetPipelineMetrics", error);
linrunqi08 marked this conversation as resolved.
Show resolved Hide resolved
if (!error.empty()) {
LOG_ERROR(sLogger, ("load GetPipelineMetrics error, Message", error));
return mPluginValid;
}

mPluginBasePtr = loader.Release();
}
Expand Down Expand Up @@ -517,6 +531,34 @@ void LogtailPlugin::ProcessLogGroup(const std::string& configName,
}
}

void LogtailPlugin::GetPipelineMetrics(std::vector<std::map<std::string, std::string>>& metircsList) {
if (mGetPipelineMetricsFun != nullptr) {
auto metrics = mGetPipelineMetricsFun();
if (metrics != nullptr) {
for (int i = 0; i < metrics->count; ++i) {
std::map<std::string, std::string> item;
InnerPluginMetric* innerpm = metrics->metrics[i];
if (innerpm != nullptr) {
for (int j = 0; j < innerpm->count; ++j) {
InnerKeyValue* innerkv = innerpm->keyValues[j];
if (innerkv != nullptr) {
item.insert(std::make_pair(std::string(innerkv->key), std::string(innerkv->value)));
free(innerkv->key);
free(innerkv->value);
free(innerkv);
}
}
free(innerpm->keyValues);
free(innerpm);
}
metircsList.emplace_back(item);
}
free(metrics->metrics);
free(metrics);
}
}
}

K8sContainerMeta LogtailPlugin::GetContainerMeta(const string& containerID) {
if (mPluginValid && mGetContainerMetaFun != nullptr) {
GoString id;
Expand Down
22 changes: 21 additions & 1 deletion core/go_pipeline/LogtailPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,23 @@ struct innerContainerMeta {
char** envsKey;
char** envsVal;
};

typedef struct {
char* key;
char* value;
} InnerKeyValue;

typedef struct {
InnerKeyValue** keyValues;
int count;
} InnerPluginMetric;

typedef struct {
InnerPluginMetric** metrics;
int count;
} InnerPluginMetrics;


struct K8sContainerMeta {
std::string PodName;
std::string K8sNamespace;
Expand Down Expand Up @@ -122,6 +139,7 @@ typedef GoInt (*InitPluginBaseV2Fun)(GoString cfg);
typedef GoInt (*ProcessLogsFun)(GoString c, GoSlice l, GoString p, GoString t, GoSlice tags);
typedef GoInt (*ProcessLogGroupFun)(GoString c, GoSlice l, GoString p);
typedef struct innerContainerMeta* (*GetContainerMetaFun)(GoString containerID);
typedef InnerPluginMetrics* (*GetPipelineMetricsFun)();

// Methods export by adapter.
typedef int (*IsValidToSendFun)(long long logstoreKey);
Expand Down Expand Up @@ -234,6 +252,8 @@ class LogtailPlugin {

K8sContainerMeta GetContainerMeta(const std::string& containerID);

void GetPipelineMetrics(std::vector<std::map<std::string, std::string>>& metircsList);

private:
void* mPluginBasePtr;
void* mPluginAdapterPtr;
Expand All @@ -252,7 +272,7 @@ class LogtailPlugin {
ProcessLogsFun mProcessLogsFun;
ProcessLogGroupFun mProcessLogGroupFun;
GetContainerMetaFun mGetContainerMetaFun;

GetPipelineMetricsFun mGetPipelineMetricsFun;
// Configuration for plugin system in JSON format.
Json::Value mPluginCfg;

Expand Down
5 changes: 4 additions & 1 deletion core/monitor/LogtaiMetric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,17 @@ void WriteMetrics::PreparePluginCommonLabels(const std::string& projectName,
const std::string& configName,
const std::string& pluginName,
const std::string& pluginID,
const std::string& nodeID,
const std::string& childNodeID,
MetricLabels& labels) {
labels.emplace_back(std::make_pair("project", projectName));
labels.emplace_back(std::make_pair("logstore", logstoreName));
labels.emplace_back(std::make_pair("region", region));
labels.emplace_back(std::make_pair("config_name", configName));
labels.emplace_back(std::make_pair("plugin_name", pluginName));
labels.emplace_back(std::make_pair("plugin_id", pluginID));
labels.emplace_back(std::make_pair("node_id", nodeID));
labels.emplace_back(std::make_pair("child_node_id", childNodeID));
}

void WriteMetrics::PrepareMetricsRecordRef(MetricsRecordRef& ref, MetricLabels&& labels) {
Expand Down Expand Up @@ -255,7 +259,6 @@ void ReadMetrics::ReadAsLogGroup(std::map<std::string, sls_logs::LogGroup*>& log
MetricsRecord* tmp = mHead;
while (tmp) {
Log* logPtr = nullptr;
// for (auto &item: tmp->GetLabels()) {
for (auto item = tmp->GetLabels()->begin(); item != tmp->GetLabels()->end(); ++item) {
std::pair<std::string, std::string> pair = *item;
if (METRIC_FIELD_REGION == pair.first) {
Expand Down
2 changes: 2 additions & 0 deletions core/monitor/LogtailMetric.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ class WriteMetrics {
const std::string& configName,
const std::string& pluginName,
const std::string& pluginID,
const std::string& nodeID,
const std::string& childNodeID,
MetricLabels& labels);
void PrepareMetricsRecordRef(MetricsRecordRef& ref, MetricLabels&& labels);
MetricsRecord* DoSnapshot();
Expand Down
72 changes: 64 additions & 8 deletions core/monitor/MetricExportor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include "config_manager/ConfigManager.h"
#include "LogFileProfiler.h"
#include "MetricConstants.h"
#include "go_pipeline/LogtailPlugin.h"
#include "app_config/AppConfig.h"

using namespace sls_logs;
using namespace std;
Expand All @@ -13,16 +15,53 @@ namespace logtail {

MetricExportor::MetricExportor() : mSendInterval(60), mLastSendTime(time(NULL) - (rand() % (mSendInterval / 10)) * 10) {}

void MetricExportor::PushMetrics(bool forceSend) {
int32_t curTime = time(NULL);
if (!forceSend && (curTime - mLastSendTime < mSendInterval)) {
return;
void MetricExportor::PushGoPluginMetrics() {
std::vector<std::map<std::string, std::string>> goPluginMetircsList;
LogtailPlugin::GetInstance()->GetPipelineMetrics(goPluginMetircsList);
std::map<std::string, sls_logs::LogGroup*> goLogGroupMap;

for (auto& item : goPluginMetircsList) {
std::string configName = "";
std::string region = METRIC_REGION_DEFAULT;
linrunqi08 marked this conversation as resolved.
Show resolved Hide resolved
{
// get the config_name label
for (const auto& pair : item) {
if (pair.first == "label.config_name") {
configName = pair.second;
break;
}
}
if (!configName.empty()) {
// get region info by config_name
Config* config = ConfigManager::GetInstance()->FindConfigByName(configName);
if (config) {
region = config->mRegion;
}
}
}
Log* logPtr = nullptr;
auto LogGroupIter = goLogGroupMap.find(region);
if (LogGroupIter != goLogGroupMap.end()) {
sls_logs::LogGroup* logGroup = LogGroupIter->second;
logPtr = logGroup->add_logs();
} else {
sls_logs::LogGroup* logGroup = new sls_logs::LogGroup();
logPtr = logGroup->add_logs();
goLogGroupMap.insert(std::pair<std::string, sls_logs::LogGroup*>(region, logGroup));
}
auto now = GetCurrentLogtailTime();
SetLogTime(logPtr,
linrunqi08 marked this conversation as resolved.
Show resolved Hide resolved
AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta() : now.tv_sec);
for (const auto& pair : item) {
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key(pair.first);
contentPtr->set_value(pair.second);
}
}
ReadMetrics::GetInstance()->UpdateMetrics();

std::map<std::string, sls_logs::LogGroup*> logGroupMap;
ReadMetrics::GetInstance()->ReadAsLogGroup(logGroupMap);
SendMetrics(goLogGroupMap);
}

void MetricExportor::SendMetrics(std::map<std::string, sls_logs::LogGroup*>& logGroupMap) {
std::map<std::string, sls_logs::LogGroup*>::iterator iter;
for (iter = logGroupMap.begin(); iter != logGroupMap.end(); iter ++) {
sls_logs::LogGroup* logGroup = iter->second;
Expand All @@ -37,4 +76,21 @@ void MetricExportor::PushMetrics(bool forceSend) {
delete logGroup;
}
}

void MetricExportor::PushMetrics(bool forceSend) {
int32_t curTime = time(NULL);
if (!forceSend && (curTime - mLastSendTime < mSendInterval)) {
return;
}

if (LogtailPlugin::GetInstance()->IsPluginOpened()) {
PushGoPluginMetrics();
}

ReadMetrics::GetInstance()->UpdateMetrics();
std::map<std::string, sls_logs::LogGroup*> logGroupMap;
ReadMetrics::GetInstance()->ReadAsLogGroup(logGroupMap);

SendMetrics(logGroupMap);
}
}
2 changes: 2 additions & 0 deletions core/monitor/MetricExportor.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class MetricExportor {
return ptr;
}
void PushMetrics(bool forceSend);
void PushGoPluginMetrics();
void SendMetrics(std::map<std::string, sls_logs::LogGroup*>& logGroupMap);

private:
MetricExportor();
Expand Down
Loading
Loading