Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add self monitor metric inner pipeline #1913

Merged
merged 15 commits into from
Nov 27, 2024
8 changes: 1 addition & 7 deletions core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
#include "file_server/event_handler/LogInput.h"
#include "go_pipeline/LogtailPlugin.h"
#include "logger/Logger.h"
#include "monitor/MetricExportor.h"
#include "monitor/Monitor.h"
#include "pipeline/PipelineManager.h"
#include "pipeline/plugin/PluginRegistry.h"
Expand All @@ -67,7 +66,6 @@
DEFINE_FLAG_BOOL(ilogtail_disable_core, "disable core in worker process", true);
DEFINE_FLAG_INT32(file_tags_update_interval, "second", 1);
DEFINE_FLAG_INT32(config_scan_interval, "seconds", 10);
DEFINE_FLAG_INT32(profiling_check_interval, "seconds", 60);
DEFINE_FLAG_INT32(tcmalloc_release_memory_interval, "force release memory held by tcmalloc, seconds", 300);
DEFINE_FLAG_INT32(exit_flushout_duration, "exit process flushout duration", 20 * 1000);
DEFINE_FLAG_INT32(queue_check_gc_interval_sec, "30s", 30);
Expand Down Expand Up @@ -266,7 +264,7 @@ void Application::Start() { // GCOVR_EXCL_START

ProcessorRunner::GetInstance()->Init();

time_t curTime = 0, lastProfilingCheckTime = 0, lastConfigCheckTime = 0, lastUpdateMetricTime = 0,
time_t curTime = 0, lastConfigCheckTime = 0, lastUpdateMetricTime = 0,
lastCheckTagsTime = 0, lastQueueGCTime = 0;
#ifndef LOGTAIL_NO_TC_MALLOC
time_t lastTcmallocReleaseMemTime = 0;
Expand All @@ -291,10 +289,6 @@ void Application::Start() { // GCOVR_EXCL_START
}
lastConfigCheckTime = curTime;
}
if (curTime - lastProfilingCheckTime >= INT32_FLAG(profiling_check_interval)) {
MetricExportor::GetInstance()->PushMetrics(false);
lastProfilingCheckTime = curTime;
}
#ifndef LOGTAIL_NO_TC_MALLOC
if (curTime - lastTcmallocReleaseMemTime >= INT32_FLAG(tcmalloc_release_memory_interval)) {
MallocExtension::instance()->ReleaseFreeMemory();
Expand Down
1 change: 1 addition & 0 deletions core/config/watcher/ConfigWatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ void ConfigWatcher::AddSource(const string& dir, mutex* mux) {
void ConfigWatcher::ClearEnvironment() {
mSourceDir.clear();
mFileInfoMap.clear();
mInnerConfigMap.clear();
}
#endif

Expand Down
1 change: 1 addition & 0 deletions core/config/watcher/ConfigWatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class ConfigWatcher {
std::vector<std::filesystem::path> mSourceDir;
std::map<std::string, std::mutex*> mDirMutexMap;
std::map<std::string, std::pair<uintmax_t, std::filesystem::file_time_type>> mFileInfoMap;
std::map<std::string, std::string> mInnerConfigMap;
};

} // namespace logtail
173 changes: 133 additions & 40 deletions core/config/watcher/PipelineConfigWatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
#include "config/watcher/PipelineConfigWatcher.h"

#include <memory>
#include <unordered_set>

#include "common/FileSystemUtil.h"
#include "config/ConfigUtil.h"
#include "logger/Logger.h"
#include "monitor/Monitor.h"
#include "pipeline/PipelineManager.h"
#include "task_pipeline/TaskPipelineManager.h"

Expand All @@ -37,6 +37,138 @@ pair<PipelineConfigDiff, TaskConfigDiff> PipelineConfigWatcher::CheckConfigDiff(
PipelineConfigDiff pDiff;
TaskConfigDiff tDiff;
unordered_set<string> configSet;
// inner configs
InsertInnerPipelines(pDiff, tDiff, configSet);
// configs from file
InsertPipelines(pDiff, tDiff, configSet);

for (const auto& name : mPipelineManager->GetAllConfigNames()) {
if (configSet.find(name) == configSet.end()) {
pDiff.mRemoved.push_back(name);
LOG_INFO(sLogger,
("existing valid config is removed", "prepare to stop current running pipeline")("config", name));
}
}
for (const auto& name : mTaskPipelineManager->GetAllPipelineNames()) {
if (configSet.find(name) == configSet.end()) {
tDiff.mRemoved.push_back(name);
LOG_INFO(sLogger,
("existing valid config is removed", "prepare to stop current running task")("config", name));
}
}
for (auto it = mFileInfoMap.begin(); it != mFileInfoMap.end();) {
string configName = filesystem::path(it->first).stem().string();
if (configSet.find(configName) == configSet.end()) {
it = mFileInfoMap.erase(it);
} else {
++it;
}
}

if (!pDiff.IsEmpty()) {
LOG_INFO(sLogger,
("config files scan done", "got updates, begin to update pipelines")("added", pDiff.mAdded.size())(
"modified", pDiff.mModified.size())("removed", pDiff.mRemoved.size()));
} else {
LOG_DEBUG(sLogger, ("config files scan done", "no pipeline update"));
}
if (!tDiff.IsEmpty()) {
LOG_INFO(sLogger,
("config files scan done", "got updates, begin to update tasks")("added", tDiff.mAdded.size())(
"modified", tDiff.mModified.size())("removed", tDiff.mRemoved.size()));
} else {
LOG_DEBUG(sLogger, ("config files scan done", "no task update"));
}

return make_pair(std::move(pDiff), std::move(tDiff));
}

void PipelineConfigWatcher::InsertInnerPipelines(PipelineConfigDiff& pDiff,
TaskConfigDiff& tDiff,
unordered_set<string>& configSet) {
std::map<std::string, std::string> innerPipelines;
// self-monitor metric
innerPipelines[LoongCollectorMonitor::GetInnerSelfMonitorMetricPipelineName()]
= LoongCollectorMonitor::GetInnerSelfMonitorMetricPipeline();

// process
for (const auto& pipeline : innerPipelines) {
if (configSet.find(pipeline.first) != configSet.end()) {
LOG_WARNING(sLogger,
("more than 1 config with the same name is found", "skip current config")("inner pipeline",
pipeline.first));
continue;
}
configSet.insert(pipeline.first);

string errorMsg;
auto iter = mInnerConfigMap.find(pipeline.first);
if (iter == mInnerConfigMap.end()) {
mInnerConfigMap[pipeline.first] = pipeline.second;
unique_ptr<Json::Value> detail = make_unique<Json::Value>();
if (!ParseConfigDetail(pipeline.second, ".json", *detail, errorMsg)) {
LOG_WARNING(sLogger,
("config format error", "skip current object")("error msg", errorMsg)("inner pipeline",
pipeline.first));
continue;
}
if (!IsConfigEnabled(pipeline.first, *detail)) {
LOG_INFO(sLogger, ("new config found and disabled", "skip current object")("config", pipeline.first));
continue;
}
if (!CheckAddedConfig(pipeline.first, std::move(detail), pDiff, tDiff)) {
continue;
}
} else if (pipeline.second != iter->second) {
mInnerConfigMap[pipeline.first] = pipeline.second;
unique_ptr<Json::Value> detail = make_unique<Json::Value>();
if (!ParseConfigDetail(pipeline.second, ".json", *detail, errorMsg)) {
LOG_WARNING(sLogger,
("config format error", "skip current object")("error msg", errorMsg)("inner pipeline",
pipeline.first));
continue;
}
if (!IsConfigEnabled(pipeline.first, *detail)) {
switch (GetConfigType(*detail)) {
case ConfigType::Pipeline:
if (mPipelineManager->FindConfigByName(pipeline.first)) {
pDiff.mRemoved.push_back(pipeline.first);
LOG_INFO(sLogger,
("existing valid config modified and disabled",
"prepare to stop current running pipeline")("config", pipeline.first));
} else {
LOG_INFO(sLogger,
("existing invalid config modified and disabled",
"skip current object")("config", pipeline.first));
}
break;
case ConfigType::Task:
if (mTaskPipelineManager->FindPipelineByName(pipeline.first)) {
tDiff.mRemoved.push_back(pipeline.first);
LOG_INFO(sLogger,
("existing valid config modified and disabled",
"prepare to stop current running task")("config", pipeline.first));
} else {
LOG_INFO(sLogger,
("existing invalid config modified and disabled",
"skip current object")("config", pipeline.first));
}
break;
}
continue;
}
if (!CheckModifiedConfig(pipeline.first, std::move(detail), pDiff, tDiff)) {
continue;
}
} else {
LOG_DEBUG(sLogger, ("existing inner config unchanged", "skip current object"));
}
}
}

void PipelineConfigWatcher::InsertPipelines(PipelineConfigDiff& pDiff,
TaskConfigDiff& tDiff,
std::unordered_set<std::string>& configSet) {
for (const auto& dir : mSourceDir) {
error_code ec;
filesystem::file_status s = filesystem::status(dir, ec);
Expand Down Expand Up @@ -139,45 +271,6 @@ pair<PipelineConfigDiff, TaskConfigDiff> PipelineConfigWatcher::CheckConfigDiff(
}
}
}
for (const auto& name : mPipelineManager->GetAllConfigNames()) {
if (configSet.find(name) == configSet.end()) {
pDiff.mRemoved.push_back(name);
LOG_INFO(sLogger,
("existing valid config is removed", "prepare to stop current running pipeline")("config", name));
}
}
for (const auto& name : mTaskPipelineManager->GetAllPipelineNames()) {
if (configSet.find(name) == configSet.end()) {
tDiff.mRemoved.push_back(name);
LOG_INFO(sLogger,
("existing valid config is removed", "prepare to stop current running task")("config", name));
}
}
for (auto it = mFileInfoMap.begin(); it != mFileInfoMap.end();) {
string configName = filesystem::path(it->first).stem().string();
if (configSet.find(configName) == configSet.end()) {
it = mFileInfoMap.erase(it);
} else {
++it;
}
}

if (!pDiff.IsEmpty()) {
LOG_INFO(sLogger,
("config files scan done", "got updates, begin to update pipelines")("added", pDiff.mAdded.size())(
"modified", pDiff.mModified.size())("removed", pDiff.mRemoved.size()));
} else {
LOG_DEBUG(sLogger, ("config files scan done", "no pipeline update"));
}
if (!tDiff.IsEmpty()) {
LOG_INFO(sLogger,
("config files scan done", "got updates, begin to update tasks")("added", tDiff.mAdded.size())(
"modified", tDiff.mModified.size())("removed", tDiff.mRemoved.size()));
} else {
LOG_DEBUG(sLogger, ("config files scan done", "no task update"));
}

return make_pair(std::move(pDiff), std::move(tDiff));
}

bool PipelineConfigWatcher::CheckAddedConfig(const string& configName,
Expand Down
4 changes: 4 additions & 0 deletions core/config/watcher/PipelineConfigWatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#pragma once

#include <unordered_set>

#include "config/ConfigDiff.h"
#include "config/watcher/ConfigWatcher.h"

Expand Down Expand Up @@ -44,6 +46,8 @@ class PipelineConfigWatcher : public ConfigWatcher {
PipelineConfigWatcher();
~PipelineConfigWatcher() = default;

void InsertInnerPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, std::unordered_set<std::string>& configSet);
void InsertPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, std::unordered_set<std::string>& configSet);
bool CheckAddedConfig(const std::string& configName,
std::unique_ptr<Json::Value>&& configDetail,
PipelineConfigDiff& pDiff,
Expand Down
1 change: 0 additions & 1 deletion core/file_server/EventDispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
#include "file_server/polling/PollingDirFile.h"
#include "file_server/polling/PollingModify.h"
#include "monitor/AlarmManager.h"
#include "monitor/MetricExportor.h"
#include "protobuf/sls/metric.pb.h"
#include "protobuf/sls/sls_logs.pb.h"
#ifdef APSARA_UNIT_TEST_MAIN
Expand Down
Loading
Loading