diff --git a/core/application/Application.cpp b/core/application/Application.cpp index 42c0fbc3f3..09901cced9 100644 --- a/core/application/Application.cpp +++ b/core/application/Application.cpp @@ -40,7 +40,6 @@ #include "file_server/event_handler/LogInput.h" #include "go_pipeline/LogtailPlugin.h" #include "logger/Logger.h" -#include "monitor/MetricExportor.h" #include "monitor/Monitor.h" #include "pipeline/PipelineManager.h" #include "pipeline/plugin/PluginRegistry.h" @@ -67,7 +66,6 @@ DEFINE_FLAG_BOOL(ilogtail_disable_core, "disable core in worker process", true); DEFINE_FLAG_INT32(file_tags_update_interval, "second", 1); DEFINE_FLAG_INT32(config_scan_interval, "seconds", 10); -DEFINE_FLAG_INT32(profiling_check_interval, "seconds", 60); DEFINE_FLAG_INT32(tcmalloc_release_memory_interval, "force release memory held by tcmalloc, seconds", 300); DEFINE_FLAG_INT32(exit_flushout_duration, "exit process flushout duration", 20 * 1000); DEFINE_FLAG_INT32(queue_check_gc_interval_sec, "30s", 30); @@ -266,7 +264,7 @@ void Application::Start() { // GCOVR_EXCL_START ProcessorRunner::GetInstance()->Init(); - time_t curTime = 0, lastProfilingCheckTime = 0, lastConfigCheckTime = 0, lastUpdateMetricTime = 0, + time_t curTime = 0, lastConfigCheckTime = 0, lastUpdateMetricTime = 0, lastCheckTagsTime = 0, lastQueueGCTime = 0; #ifndef LOGTAIL_NO_TC_MALLOC time_t lastTcmallocReleaseMemTime = 0; @@ -291,10 +289,6 @@ void Application::Start() { // GCOVR_EXCL_START } lastConfigCheckTime = curTime; } - if (curTime - lastProfilingCheckTime >= INT32_FLAG(profiling_check_interval)) { - MetricExportor::GetInstance()->PushMetrics(false); - lastProfilingCheckTime = curTime; - } #ifndef LOGTAIL_NO_TC_MALLOC if (curTime - lastTcmallocReleaseMemTime >= INT32_FLAG(tcmalloc_release_memory_interval)) { MallocExtension::instance()->ReleaseFreeMemory(); diff --git a/core/config/watcher/ConfigWatcher.cpp b/core/config/watcher/ConfigWatcher.cpp index fae78869b0..dd29d2ff63 100644 --- a/core/config/watcher/ConfigWatcher.cpp +++ b/core/config/watcher/ConfigWatcher.cpp @@ -29,6 +29,7 @@ void ConfigWatcher::AddSource(const string& dir, mutex* mux) { void ConfigWatcher::ClearEnvironment() { mSourceDir.clear(); mFileInfoMap.clear(); + mInnerConfigMap.clear(); } #endif diff --git a/core/config/watcher/ConfigWatcher.h b/core/config/watcher/ConfigWatcher.h index 0b43d18cbb..b95a0f611d 100644 --- a/core/config/watcher/ConfigWatcher.h +++ b/core/config/watcher/ConfigWatcher.h @@ -43,6 +43,7 @@ class ConfigWatcher { std::vector mSourceDir; std::map mDirMutexMap; std::map> mFileInfoMap; + std::map mInnerConfigMap; }; } // namespace logtail diff --git a/core/config/watcher/PipelineConfigWatcher.cpp b/core/config/watcher/PipelineConfigWatcher.cpp index 40118cbc47..7c91068b72 100644 --- a/core/config/watcher/PipelineConfigWatcher.cpp +++ b/core/config/watcher/PipelineConfigWatcher.cpp @@ -15,11 +15,11 @@ #include "config/watcher/PipelineConfigWatcher.h" #include -#include #include "common/FileSystemUtil.h" #include "config/ConfigUtil.h" #include "logger/Logger.h" +#include "monitor/Monitor.h" #include "pipeline/PipelineManager.h" #include "task_pipeline/TaskPipelineManager.h" @@ -37,6 +37,138 @@ pair PipelineConfigWatcher::CheckConfigDiff( PipelineConfigDiff pDiff; TaskConfigDiff tDiff; unordered_set configSet; + // inner configs + InsertInnerPipelines(pDiff, tDiff, configSet); + // configs from file + InsertPipelines(pDiff, tDiff, configSet); + + for (const auto& name : mPipelineManager->GetAllConfigNames()) { + if (configSet.find(name) == configSet.end()) { + pDiff.mRemoved.push_back(name); + LOG_INFO(sLogger, + ("existing valid config is removed", "prepare to stop current running pipeline")("config", name)); + } + } + for (const auto& name : mTaskPipelineManager->GetAllPipelineNames()) { + if (configSet.find(name) == configSet.end()) { + tDiff.mRemoved.push_back(name); + LOG_INFO(sLogger, + ("existing valid config is removed", "prepare to stop current running task")("config", name)); + } + } + for (auto it = mFileInfoMap.begin(); it != mFileInfoMap.end();) { + string configName = filesystem::path(it->first).stem().string(); + if (configSet.find(configName) == configSet.end()) { + it = mFileInfoMap.erase(it); + } else { + ++it; + } + } + + if (!pDiff.IsEmpty()) { + LOG_INFO(sLogger, + ("config files scan done", "got updates, begin to update pipelines")("added", pDiff.mAdded.size())( + "modified", pDiff.mModified.size())("removed", pDiff.mRemoved.size())); + } else { + LOG_DEBUG(sLogger, ("config files scan done", "no pipeline update")); + } + if (!tDiff.IsEmpty()) { + LOG_INFO(sLogger, + ("config files scan done", "got updates, begin to update tasks")("added", tDiff.mAdded.size())( + "modified", tDiff.mModified.size())("removed", tDiff.mRemoved.size())); + } else { + LOG_DEBUG(sLogger, ("config files scan done", "no task update")); + } + + return make_pair(std::move(pDiff), std::move(tDiff)); +} + +void PipelineConfigWatcher::InsertInnerPipelines(PipelineConfigDiff& pDiff, + TaskConfigDiff& tDiff, + unordered_set& configSet) { + std::map innerPipelines; + // self-monitor metric + innerPipelines[LoongCollectorMonitor::GetInnerSelfMonitorMetricPipelineName()] + = LoongCollectorMonitor::GetInnerSelfMonitorMetricPipeline(); + + // process + for (const auto& pipeline : innerPipelines) { + if (configSet.find(pipeline.first) != configSet.end()) { + LOG_WARNING(sLogger, + ("more than 1 config with the same name is found", "skip current config")("inner pipeline", + pipeline.first)); + continue; + } + configSet.insert(pipeline.first); + + string errorMsg; + auto iter = mInnerConfigMap.find(pipeline.first); + if (iter == mInnerConfigMap.end()) { + mInnerConfigMap[pipeline.first] = pipeline.second; + unique_ptr detail = make_unique(); + if (!ParseConfigDetail(pipeline.second, ".json", *detail, errorMsg)) { + LOG_WARNING(sLogger, + ("config format error", "skip current object")("error msg", errorMsg)("inner pipeline", + pipeline.first)); + continue; + } + if (!IsConfigEnabled(pipeline.first, *detail)) { + LOG_INFO(sLogger, ("new config found and disabled", "skip current object")("config", pipeline.first)); + continue; + } + if (!CheckAddedConfig(pipeline.first, std::move(detail), pDiff, tDiff)) { + continue; + } + } else if (pipeline.second != iter->second) { + mInnerConfigMap[pipeline.first] = pipeline.second; + unique_ptr detail = make_unique(); + if (!ParseConfigDetail(pipeline.second, ".json", *detail, errorMsg)) { + LOG_WARNING(sLogger, + ("config format error", "skip current object")("error msg", errorMsg)("inner pipeline", + pipeline.first)); + continue; + } + if (!IsConfigEnabled(pipeline.first, *detail)) { + switch (GetConfigType(*detail)) { + case ConfigType::Pipeline: + if (mPipelineManager->FindConfigByName(pipeline.first)) { + pDiff.mRemoved.push_back(pipeline.first); + LOG_INFO(sLogger, + ("existing valid config modified and disabled", + "prepare to stop current running pipeline")("config", pipeline.first)); + } else { + LOG_INFO(sLogger, + ("existing invalid config modified and disabled", + "skip current object")("config", pipeline.first)); + } + break; + case ConfigType::Task: + if (mTaskPipelineManager->FindPipelineByName(pipeline.first)) { + tDiff.mRemoved.push_back(pipeline.first); + LOG_INFO(sLogger, + ("existing valid config modified and disabled", + "prepare to stop current running task")("config", pipeline.first)); + } else { + LOG_INFO(sLogger, + ("existing invalid config modified and disabled", + "skip current object")("config", pipeline.first)); + } + break; + } + continue; + } + if (!CheckModifiedConfig(pipeline.first, std::move(detail), pDiff, tDiff)) { + continue; + } + } else { + LOG_DEBUG(sLogger, ("existing inner config unchanged", "skip current object")); + } + } +} + +void PipelineConfigWatcher::InsertPipelines(PipelineConfigDiff& pDiff, + TaskConfigDiff& tDiff, + std::unordered_set& configSet) { for (const auto& dir : mSourceDir) { error_code ec; filesystem::file_status s = filesystem::status(dir, ec); @@ -139,45 +271,6 @@ pair PipelineConfigWatcher::CheckConfigDiff( } } } - for (const auto& name : mPipelineManager->GetAllConfigNames()) { - if (configSet.find(name) == configSet.end()) { - pDiff.mRemoved.push_back(name); - LOG_INFO(sLogger, - ("existing valid config is removed", "prepare to stop current running pipeline")("config", name)); - } - } - for (const auto& name : mTaskPipelineManager->GetAllPipelineNames()) { - if (configSet.find(name) == configSet.end()) { - tDiff.mRemoved.push_back(name); - LOG_INFO(sLogger, - ("existing valid config is removed", "prepare to stop current running task")("config", name)); - } - } - for (auto it = mFileInfoMap.begin(); it != mFileInfoMap.end();) { - string configName = filesystem::path(it->first).stem().string(); - if (configSet.find(configName) == configSet.end()) { - it = mFileInfoMap.erase(it); - } else { - ++it; - } - } - - if (!pDiff.IsEmpty()) { - LOG_INFO(sLogger, - ("config files scan done", "got updates, begin to update pipelines")("added", pDiff.mAdded.size())( - "modified", pDiff.mModified.size())("removed", pDiff.mRemoved.size())); - } else { - LOG_DEBUG(sLogger, ("config files scan done", "no pipeline update")); - } - if (!tDiff.IsEmpty()) { - LOG_INFO(sLogger, - ("config files scan done", "got updates, begin to update tasks")("added", tDiff.mAdded.size())( - "modified", tDiff.mModified.size())("removed", tDiff.mRemoved.size())); - } else { - LOG_DEBUG(sLogger, ("config files scan done", "no task update")); - } - - return make_pair(std::move(pDiff), std::move(tDiff)); } bool PipelineConfigWatcher::CheckAddedConfig(const string& configName, diff --git a/core/config/watcher/PipelineConfigWatcher.h b/core/config/watcher/PipelineConfigWatcher.h index d1f77967fe..28a7f00f97 100644 --- a/core/config/watcher/PipelineConfigWatcher.h +++ b/core/config/watcher/PipelineConfigWatcher.h @@ -16,6 +16,8 @@ #pragma once +#include + #include "config/ConfigDiff.h" #include "config/watcher/ConfigWatcher.h" @@ -44,6 +46,8 @@ class PipelineConfigWatcher : public ConfigWatcher { PipelineConfigWatcher(); ~PipelineConfigWatcher() = default; + void InsertInnerPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, std::unordered_set& configSet); + void InsertPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, std::unordered_set& configSet); bool CheckAddedConfig(const std::string& configName, std::unique_ptr&& configDetail, PipelineConfigDiff& pDiff, diff --git a/core/file_server/EventDispatcher.cpp b/core/file_server/EventDispatcher.cpp index 3585bdc1f3..7c1869f67e 100644 --- a/core/file_server/EventDispatcher.cpp +++ b/core/file_server/EventDispatcher.cpp @@ -47,7 +47,6 @@ #include "file_server/polling/PollingDirFile.h" #include "file_server/polling/PollingModify.h" #include "monitor/AlarmManager.h" -#include "monitor/MetricExportor.h" #include "protobuf/sls/metric.pb.h" #include "protobuf/sls/sls_logs.pb.h" #ifdef APSARA_UNIT_TEST_MAIN diff --git a/core/monitor/MetricExportor.cpp b/core/monitor/MetricExportor.cpp deleted file mode 100644 index d6417bdc00..0000000000 --- a/core/monitor/MetricExportor.cpp +++ /dev/null @@ -1,283 +0,0 @@ -// Copyright 2023 iLogtail Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "MetricExportor.h" - -#include - -#include "MetricConstants.h" -#include "MetricManager.h" -#include "app_config/AppConfig.h" -#include "common/FileSystemUtil.h" -#include "common/RuntimeUtil.h" -#include "common/TimeUtil.h" -#include "go_pipeline/LogtailPlugin.h" -#include "pipeline/PipelineManager.h" -#include "protobuf/sls/sls_logs.pb.h" - -using namespace sls_logs; -using namespace std; - -DECLARE_FLAG_STRING(metrics_report_method); - -namespace logtail { - -const string METRIC_REGION_DEFAULT = "default"; -const string METRIC_SLS_LOGSTORE_NAME = "shennong_log_profile"; -const string METRIC_TOPIC_TYPE = "loong_collector_metric"; - -const std::string METRIC_EXPORT_TYPE_GO = "direct"; -const std::string METRIC_EXPORT_TYPE_CPP = "cpp_provided"; - -MetricExportor::MetricExportor() : mSendInterval(60), mLastSendTime(time(NULL) - (rand() % (mSendInterval / 10)) * 10) { -} - -void MetricExportor::PushMetrics(bool forceSend) { - int32_t curTime = time(NULL); - if (!forceSend && (curTime - mLastSendTime < mSendInterval)) { - return; - } - - // go指标在Cpp指标前获取,是为了在 Cpp 部分指标做 SnapShot - // 前(即调用 ReadMetrics::GetInstance()->UpdateMetrics() 函数),把go部分的进程级指标填写到 Cpp - // 的进程级指标中去,随Cpp的进程级指标一起输出 - if (LogtailPlugin::GetInstance()->IsPluginOpened()) { - PushGoMetrics(); - } - PushCppMetrics(); -} - -void MetricExportor::PushCppMetrics() { - ReadMetrics::GetInstance()->UpdateMetrics(); - - if ("sls" == STRING_FLAG(metrics_report_method)) { - std::map logGroupMap; - ReadMetrics::GetInstance()->ReadAsLogGroup(METRIC_LABEL_KEY_REGION, METRIC_REGION_DEFAULT, logGroupMap); - SendToSLS(logGroupMap); - } else if ("file" == STRING_FLAG(metrics_report_method)) { - std::string metricsContent; - ReadMetrics::GetInstance()->ReadAsFileBuffer(metricsContent); - SendToLocalFile(metricsContent, "self-metrics-cpp"); - } -} - -void MetricExportor::PushGoMetrics() { - std::vector> goDirectMetircsList; - LogtailPlugin::GetInstance()->GetGoMetrics(goDirectMetircsList, METRIC_EXPORT_TYPE_GO); - std::vector> goCppProvidedMetircsList; - LogtailPlugin::GetInstance()->GetGoMetrics(goCppProvidedMetircsList, METRIC_EXPORT_TYPE_CPP); - - PushGoCppProvidedMetrics(goCppProvidedMetircsList); - PushGoDirectMetrics(goDirectMetircsList); -} - -void MetricExportor::SendToSLS(std::map& logGroupMap) { - std::map::iterator iter; - for (iter = logGroupMap.begin(); iter != logGroupMap.end(); iter++) { - sls_logs::LogGroup* logGroup = iter->second; - logGroup->set_category(METRIC_SLS_LOGSTORE_NAME); - logGroup->set_source(LoongCollectorMonitor::mIpAddr); - logGroup->set_topic(METRIC_TOPIC_TYPE); - if (METRIC_REGION_DEFAULT == iter->first) { - GetProfileSender()->SendToProfileProject(GetProfileSender()->GetDefaultProfileRegion(), *logGroup); - } else { - GetProfileSender()->SendToProfileProject(iter->first, *logGroup); - } - delete logGroup; - } -} - -void MetricExportor::SendToLocalFile(std::string& metricsContent, const std::string metricsFileNamePrefix) { - const std::string metricsDirName = "self_metrics"; - const size_t maxFiles = 60; // 每分钟记录一次,最多保留1h的记录 - - if (!metricsContent.empty()) { - // 创建输出目录(如果不存在) - std::string outputDirectory = GetAgentLogDir() + metricsDirName; - Mkdirs(outputDirectory); - - std::vector metricFiles; - - for (const auto& entry : std::filesystem::directory_iterator(outputDirectory)) { - if (entry.is_regular_file() && entry.path().filename().string().find(metricsFileNamePrefix) == 0) { - metricFiles.push_back(entry.path()); - } - } - - // 删除多余的文件 - if (metricFiles.size() > maxFiles) { - std::sort(metricFiles.begin(), - metricFiles.end(), - [](const std::filesystem::path& a, const std::filesystem::path& b) { - return std::filesystem::last_write_time(a) > std::filesystem::last_write_time(b); - }); - - for (size_t i = maxFiles; i < metricFiles.size(); ++i) { - std::filesystem::remove(metricFiles[i]); - } - } - - // 生成文件名 - auto now = std::chrono::system_clock::now(); - std::time_t now_time = std::chrono::system_clock::to_time_t(now); - std::tm now_tm = *std::localtime(&now_time); - std::ostringstream oss; - oss << metricsFileNamePrefix << std::put_time(&now_tm, "-%Y-%m-%d_%H-%M-%S") << ".json"; - std::string filePath = PathJoin(outputDirectory, oss.str()); - - // 写入文件 - std::ofstream outFile(filePath); - if (!outFile) { - LOG_ERROR(sLogger, ("Open file fail when print metrics", filePath.c_str())); - } else { - outFile << metricsContent; - outFile.close(); - } - } -} - -// metrics from Go that are directly outputted -void MetricExportor::PushGoDirectMetrics(std::vector>& metricsList) { - if (metricsList.size() == 0) { - return; - } - - if ("sls" == STRING_FLAG(metrics_report_method)) { - std::map logGroupMap; - SerializeGoDirectMetricsListToLogGroupMap(metricsList, logGroupMap); - SendToSLS(logGroupMap); - } else if ("file" == STRING_FLAG(metrics_report_method)) { - std::string metricsContent; - SerializeGoDirectMetricsListToString(metricsList, metricsContent); - SendToLocalFile(metricsContent, "self-metrics-go"); - } -} - -// metrics from Go that are provided by cpp -void MetricExportor::PushGoCppProvidedMetrics(std::vector>& metricsList) { - if (metricsList.size() == 0) { - return; - } - - for (auto metrics : metricsList) { - for (auto metric : metrics) { - if (metric.first == METRIC_KEY_VALUE + "." + METRIC_AGENT_MEMORY_GO) { - LoongCollectorMonitor::GetInstance()->SetAgentGoMemory(std::stoi(metric.second)); - } - if (metric.first == METRIC_KEY_VALUE + "." + METRIC_AGENT_GO_ROUTINES_TOTAL) { - LoongCollectorMonitor::GetInstance()->SetAgentGoRoutinesTotal(std::stoi(metric.second)); - } - LogtailMonitor::GetInstance()->UpdateMetric(metric.first, metric.second); - } - } -} - -void MetricExportor::SerializeGoDirectMetricsListToLogGroupMap( - std::vector>& metricsList, - std::map& logGroupMap) { - for (auto& metrics : metricsList) { - std::string configName = ""; - std::string region = METRIC_REGION_DEFAULT; - { - // get the pipeline_name label - for (const auto& metric : metrics) { - if (metric.first == METRIC_KEY_LABEL + "." + METRIC_LABEL_KEY_PIPELINE_NAME) { - configName = metric.second; - break; - } - } - if (!configName.empty()) { - // get region info by pipeline_name - shared_ptr p = PipelineManager::GetInstance()->FindConfigByName(configName); - if (p) { - FlusherSLS* pConfig = NULL; - pConfig = const_cast(static_cast(p->GetFlushers()[0]->GetPlugin())); - if (pConfig) { - region = pConfig->mRegion; - } - } - } - } - Log* logPtr = nullptr; - auto LogGroupIter = logGroupMap.find(region); - if (LogGroupIter != logGroupMap.end()) { - sls_logs::LogGroup* logGroup = LogGroupIter->second; - logPtr = logGroup->add_logs(); - } else { - sls_logs::LogGroup* logGroup = new sls_logs::LogGroup(); - logPtr = logGroup->add_logs(); - logGroupMap.insert(std::pair(region, logGroup)); - } - auto now = GetCurrentLogtailTime(); - SetLogTime(logPtr, - AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta() : now.tv_sec); - - Json::Value metricsRecordLabel; - for (const auto& metric : metrics) { - // category - if (metric.first.compare("label.metric_category") == 0) { - Log_Content* contentPtr = logPtr->add_contents(); - contentPtr->set_key(METRIC_KEY_CATEGORY); - contentPtr->set_value(metric.second); - continue; - } - // label - if (metric.first.compare(0, METRIC_KEY_LABEL.length(), METRIC_KEY_LABEL)) { - metricsRecordLabel[metric.first.substr(METRIC_KEY_LABEL.length() + 1)] = metric.second; - continue; - } - // value - Log_Content* contentPtr = logPtr->add_contents(); - contentPtr->set_key(metric.first); - contentPtr->set_value(metric.second); - } - Json::StreamWriterBuilder writer; - writer["indentation"] = ""; - std::string jsonString = Json::writeString(writer, metricsRecordLabel); - Log_Content* contentPtr = logPtr->add_contents(); - contentPtr->set_key(METRIC_KEY_LABEL); - contentPtr->set_value(jsonString); - } -} - -void MetricExportor::SerializeGoDirectMetricsListToString(std::vector>& metricsList, - std::string& metricsContent) { - std::ostringstream oss; - - for (auto& metrics : metricsList) { - Json::Value metricsRecordJson, metricsRecordLabel; - auto now = GetCurrentLogtailTime(); - metricsRecordJson["time"] - = AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta() : now.tv_sec; - for (const auto& metric : metrics) { - if (metric.first.compare("label.metric_category") == 0) { - metricsRecordJson[METRIC_KEY_CATEGORY] = metric.second; - continue; - } - if (metric.first.compare(0, METRIC_KEY_LABEL.length(), METRIC_KEY_LABEL) == 0) { - metricsRecordLabel[metric.first.substr(METRIC_KEY_LABEL.length() + 1)] = metric.second; - continue; - } - metricsRecordJson[metric.first.substr(METRIC_KEY_VALUE.length() + 1)] = metric.second; - } - metricsRecordJson[METRIC_KEY_LABEL] = metricsRecordLabel; - Json::StreamWriterBuilder writer; - writer["indentation"] = ""; - std::string jsonString = Json::writeString(writer, metricsRecordJson); - oss << jsonString << '\n'; - } - metricsContent = oss.str(); -} - -} // namespace logtail \ No newline at end of file diff --git a/core/monitor/MetricExportor.h b/core/monitor/MetricExportor.h deleted file mode 100644 index d40518ad12..0000000000 --- a/core/monitor/MetricExportor.h +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2023 iLogtail Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include "Monitor.h" -#include "provider/Provider.h" - - -namespace logtail { - -class MetricExportor { -public: - static MetricExportor* GetInstance() { - static MetricExportor* ptr = new MetricExportor(); - return ptr; - } - void PushMetrics(bool forceSend); - -private: - MetricExportor(); - - void PushCppMetrics(); - void PushGoMetrics(); - - // Send Methods - void SendToSLS(std::map& logGroupMap); - void SendToLocalFile(std::string& metricsContent, const std::string metricsFileNamePrefix); - - // go metrics - void PushGoDirectMetrics(std::vector>& metricsList); - void PushGoCppProvidedMetrics(std::vector>& metricsList); - void SerializeGoDirectMetricsListToLogGroupMap(std::vector>& metricsList, - std::map& logGroupMap); - void SerializeGoDirectMetricsListToString(std::vector>& metricsList, - std::string& metricsContent); - - int32_t mSendInterval; - int32_t mLastSendTime; -}; - -} // namespace logtail \ No newline at end of file diff --git a/core/monitor/MetricManager.cpp b/core/monitor/MetricManager.cpp index 4b0dada653..be425a9ce7 100644 --- a/core/monitor/MetricManager.cpp +++ b/core/monitor/MetricManager.cpp @@ -14,16 +14,183 @@ #include "MetricManager.h" +#include + +#include "Monitor.h" #include "app_config/AppConfig.h" +#include "common/HashUtil.h" +#include "common/JsonUtil.h" #include "common/StringTools.h" #include "common/TimeUtil.h" +#include "go_pipeline/LogtailPlugin.h" #include "logger/Logger.h" +#include "provider/Provider.h" using namespace sls_logs; +using namespace std; namespace logtail { +const string METRIC_KEY_CATEGORY = "category"; +const string METRIC_KEY_LABEL = "label"; +const string METRIC_TOPIC_TYPE = "loongcollector_metric"; +const string METRIC_EXPORT_TYPE_GO = "direct"; +const string METRIC_EXPORT_TYPE_CPP = "cpp_provided"; +const string METRIC_GO_KEY_LABELS = "labels"; +const string METRIC_GO_KEY_COUNTERS = "counters"; +const string METRIC_GO_KEY_GAUGES = "gauges"; + +SelfMonitorMetricEvent::SelfMonitorMetricEvent() { +} + +SelfMonitorMetricEvent::SelfMonitorMetricEvent(MetricsRecord* metricRecord) { + // category + mCategory = metricRecord->GetCategory(); + // labels + for (auto item = metricRecord->GetLabels()->begin(); item != metricRecord->GetLabels()->end(); ++item) { + pair pair = *item; + mLabels[pair.first] = pair.second; + } + for (auto item = metricRecord->GetDynamicLabels()->begin(); item != metricRecord->GetDynamicLabels()->end(); + ++item) { + pair> pair = *item; + string value = pair.second(); + mLabels[pair.first] = value; + } + // counters + for (auto& item : metricRecord->GetCounters()) { + mCounters[item->GetName()] = item->GetValue(); + } + for (auto& item : metricRecord->GetTimeCounters()) { + mCounters[item->GetName()] = item->GetValue(); + } + // gauges + for (auto& item : metricRecord->GetIntGauges()) { + mGauges[item->GetName()] = item->GetValue(); + } + for (auto& item : metricRecord->GetDoubleGauges()) { + mGauges[item->GetName()] = item->GetValue(); + } + CreateKey(); +} + +SelfMonitorMetricEvent::SelfMonitorMetricEvent(const std::map& metricRecord) { + Json::Value labels, counters, gauges; + string errMsg; + ParseJsonTable(metricRecord.at(METRIC_GO_KEY_LABELS), labels, errMsg); + ParseJsonTable(metricRecord.at(METRIC_GO_KEY_COUNTERS), counters, errMsg); + ParseJsonTable(metricRecord.at(METRIC_GO_KEY_GAUGES), gauges, errMsg); + // category + if (labels.isMember("metric_category")) { + mCategory = labels["metric_category"].asString(); + labels.removeMember("metric_category"); + } else { + mCategory = MetricCategory::METRIC_CATEGORY_UNKNOWN; + LOG_ERROR(sLogger, ("parse go metric", "labels")("err", "metric_category not found")); + } + // labels + for (Json::Value::const_iterator itr = labels.begin(); itr != labels.end(); ++itr) { + if (itr->isString()) { + mLabels[itr.key().asString()] = itr->asString(); + } + } + // counters + for (Json::Value::const_iterator itr = counters.begin(); itr != counters.end(); ++itr) { + if (itr->isUInt64()) { + mCounters[itr.key().asString()] = itr->asUInt64(); + } + if (itr->isDouble()) { + mCounters[itr.key().asString()] = static_cast(itr->asDouble()); + } + if (itr->isString()) { + try { + mCounters[itr.key().asString()] = static_cast(std::stod(itr->asString())); + } catch (...) { + mCounters[itr.key().asString()] = 0; + } + } + } + // gauges + for (Json::Value::const_iterator itr = gauges.begin(); itr != gauges.end(); ++itr) { + if (itr->isDouble()) { + mGauges[itr.key().asString()] = itr->asDouble(); + } + if (itr->isString()) { + try { + double value = std::stod(itr->asString()); + mGauges[itr.key().asString()] = value; + } catch (...) { + mGauges[itr.key().asString()] = 0; + } + } + } + CreateKey(); +} + +void SelfMonitorMetricEvent::CreateKey() { + string key = "category:" + mCategory; + for (auto label : mLabels) { + key += (";" + label.first + ":" + label.second); + } + mKey = HashString(key); + mUpdatedFlag = true; +} + +void SelfMonitorMetricEvent::SetInterval(size_t interval) { + mLastSendInterval = 0; + mSendInterval = interval; +} + +void SelfMonitorMetricEvent::Merge(SelfMonitorMetricEvent& event) { + if (mSendInterval != event.mSendInterval) { + mSendInterval = event.mSendInterval; + mLastSendInterval = 0; + } + for (auto counter = event.mCounters.begin(); counter != event.mCounters.end(); counter++) { + if (mCounters.find(counter->first) != mCounters.end()) + mCounters[counter->first] += counter->second; + else + mCounters[counter->first] = counter->second; + } + for (auto gauge = event.mGauges.begin(); gauge != event.mGauges.end(); gauge++) { + mGauges[gauge->first] = gauge->second; + } + mUpdatedFlag = true; +} + +bool SelfMonitorMetricEvent::ShouldSend() { + mLastSendInterval++; + return (mLastSendInterval >= mSendInterval) && mUpdatedFlag; +} + +bool SelfMonitorMetricEvent::ShouldDelete() { + return (mLastSendInterval >= mSendInterval) && !mUpdatedFlag; +} + +void SelfMonitorMetricEvent::ReadAsMetricEvent(MetricEvent* metricEventPtr) { + // time + metricEventPtr->SetTimestamp(GetCurrentLogtailTime().tv_sec); + // __tag__ + for (auto label = mLabels.begin(); label != mLabels.end(); label++) { + metricEventPtr->SetTag(label->first, label->second); + } + // name + metricEventPtr->SetName(mCategory); + // values + metricEventPtr->SetValue({}); + for (auto counter = mCounters.begin(); counter != mCounters.end(); counter++) { + metricEventPtr->MutableValue()->SetValue(counter->first, counter->second); + counter->second = 0; + } + for (auto gauge = mGauges.begin(); gauge != mGauges.end(); gauge++) { + metricEventPtr->MutableValue()->SetValue(gauge->first, gauge->second); + } + // set flags + mLastSendInterval = 0; + mUpdatedFlag = false; +} + WriteMetrics::~WriteMetrics() { Clear(); } @@ -150,149 +317,36 @@ ReadMetrics::~ReadMetrics() { Clear(); } -void ReadMetrics::ReadAsLogGroup(const std::string& regionFieldName, - const std::string& defaultRegion, - std::map& logGroupMap) const { +void ReadMetrics::ReadAsSelfMonitorMetricEvents(std::vector& metricEventList) const { ReadLock lock(mReadWriteLock); + // c++ metrics MetricsRecord* tmp = mHead; while (tmp) { - Log* logPtr = nullptr; - - for (auto item = tmp->GetLabels()->begin(); item != tmp->GetLabels()->end(); ++item) { - std::pair pair = *item; - if (regionFieldName == pair.first) { - std::map::iterator iter; - std::string region = pair.second; - iter = logGroupMap.find(region); - if (iter != logGroupMap.end()) { - sls_logs::LogGroup* logGroup = iter->second; - logPtr = logGroup->add_logs(); - } else { - sls_logs::LogGroup* logGroup = new sls_logs::LogGroup(); - logPtr = logGroup->add_logs(); - logGroupMap.insert(std::pair(region, logGroup)); - } - } - } - if (!logPtr) { - std::map::iterator iter; - iter = logGroupMap.find(defaultRegion); - if (iter != logGroupMap.end()) { - sls_logs::LogGroup* logGroup = iter->second; - logPtr = logGroup->add_logs(); - } else { - sls_logs::LogGroup* logGroup = new sls_logs::LogGroup(); - logPtr = logGroup->add_logs(); - logGroupMap.insert(std::pair(defaultRegion, logGroup)); - } - } - auto now = GetCurrentLogtailTime(); - SetLogTime(logPtr, - AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta() : now.tv_sec); - { // category - Log_Content* contentPtr = logPtr->add_contents(); - contentPtr->set_key(METRIC_KEY_CATEGORY); - contentPtr->set_value(tmp->GetCategory()); - } - { // label - Json::Value metricsRecordLabel; - for (auto item = tmp->GetLabels()->begin(); item != tmp->GetLabels()->end(); ++item) { - std::pair pair = *item; - metricsRecordLabel[pair.first] = pair.second; - } - for (auto item = tmp->GetDynamicLabels()->begin(); item != tmp->GetDynamicLabels()->end(); ++item) { - std::pair> pair = *item; - metricsRecordLabel[pair.first] = pair.second(); - } - Json::StreamWriterBuilder writer; - writer["indentation"] = ""; - std::string jsonString = Json::writeString(writer, metricsRecordLabel); - Log_Content* contentPtr = logPtr->add_contents(); - contentPtr->set_key(METRIC_KEY_LABEL); - contentPtr->set_value(jsonString); - } - { // value - for (auto& item : tmp->GetCounters()) { - CounterPtr counter = item; - Log_Content* contentPtr = logPtr->add_contents(); - contentPtr->set_key(counter->GetName()); - contentPtr->set_value(ToString(counter->GetValue())); - } - for (auto& item : tmp->GetTimeCounters()) { - TimeCounterPtr counter = item; - Log_Content* contentPtr = logPtr->add_contents(); - contentPtr->set_key(counter->GetName()); - contentPtr->set_value(ToString(counter->GetValue())); - } - for (auto& item : tmp->GetIntGauges()) { - IntGaugePtr gauge = item; - Log_Content* contentPtr = logPtr->add_contents(); - contentPtr->set_key(gauge->GetName()); - contentPtr->set_value(ToString(gauge->GetValue())); - } - for (auto& item : tmp->GetDoubleGauges()) { - DoubleGaugePtr gauge = item; - Log_Content* contentPtr = logPtr->add_contents(); - contentPtr->set_key(gauge->GetName()); - contentPtr->set_value(ToString(gauge->GetValue())); - } - } + metricEventList.emplace_back(SelfMonitorMetricEvent(tmp)); tmp = tmp->GetNext(); } -} - -void ReadMetrics::ReadAsFileBuffer(std::string& metricsContent) const { - ReadLock lock(mReadWriteLock); - - std::ostringstream oss; - - MetricsRecord* tmp = mHead; - while (tmp) { - Json::Value metricsRecordJson, metricsRecordLabel; - auto now = GetCurrentLogtailTime(); - metricsRecordJson["time"] - = AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta() : now.tv_sec; - - metricsRecordJson[METRIC_KEY_CATEGORY] = tmp->GetCategory(); - - for (auto item = tmp->GetLabels()->begin(); item != tmp->GetLabels()->end(); ++item) { - std::pair pair = *item; - metricsRecordLabel[pair.first] = pair.second; - } - for (auto item = tmp->GetDynamicLabels()->begin(); item != tmp->GetDynamicLabels()->end(); ++item) { - std::pair> pair = *item; - metricsRecordLabel[pair.first] = pair.second(); - } - metricsRecordJson[METRIC_KEY_LABEL] = metricsRecordLabel; - - for (auto& item : tmp->GetCounters()) { - CounterPtr counter = item; - metricsRecordJson[counter->GetName()] = ToString(counter->GetValue()); - } - for (auto& item : tmp->GetTimeCounters()) { - TimeCounterPtr counter = item; - metricsRecordJson[counter->GetName()] = ToString(counter->GetValue()); - } - for (auto& item : tmp->GetIntGauges()) { - IntGaugePtr gauge = item; - metricsRecordJson[gauge->GetName()] = ToString(gauge->GetValue()); - } - for (auto& item : tmp->GetDoubleGauges()) { - DoubleGaugePtr gauge = item; - metricsRecordJson[gauge->GetName()] = ToString(gauge->GetValue()); - } - - Json::StreamWriterBuilder writer; - writer["indentation"] = ""; - std::string jsonString = Json::writeString(writer, metricsRecordJson); - oss << jsonString << '\n'; - - tmp = tmp->GetNext(); + // go metrics + for (auto metrics : mGoMetrics) { + metricEventList.emplace_back(SelfMonitorMetricEvent(move(metrics))); } - metricsContent = oss.str(); } void ReadMetrics::UpdateMetrics() { + // go指标在Cpp指标前获取,是为了在 Cpp 部分指标做 SnapShot + // 前(即调用 ReadMetrics::GetInstance()->UpdateMetrics() 函数),把go部分的进程级指标填写到 Cpp + // 的进程级指标中去,随Cpp的进程级指标一起输出 + if (LogtailPlugin::GetInstance()->IsPluginOpened()) { + vector> goCppProvidedMetircsList; + LogtailPlugin::GetInstance()->GetGoMetrics(goCppProvidedMetircsList, METRIC_EXPORT_TYPE_CPP); + UpdateGoCppProvidedMetrics(goCppProvidedMetircsList); + + { + WriteLock lock(mReadWriteLock); + mGoMetrics.clear(); + LogtailPlugin::GetInstance()->GetGoMetrics(mGoMetrics, METRIC_EXPORT_TYPE_GO); + } + } + // 获取c++指标 MetricsRecord* snapshot = WriteMetrics::GetInstance()->DoSnapshot(); MetricsRecord* toDelete; { @@ -323,4 +377,23 @@ void ReadMetrics::Clear() { } } +// metrics from Go that are provided by cpp +void ReadMetrics::UpdateGoCppProvidedMetrics(vector>& metricsList) { + if (metricsList.size() == 0) { + return; + } + + for (auto metrics : metricsList) { + for (auto metric : metrics) { + if (metric.first == METRIC_AGENT_MEMORY_GO) { + LoongCollectorMonitor::GetInstance()->SetAgentGoMemory(stoi(metric.second)); + } + if (metric.first == METRIC_AGENT_GO_ROUTINES_TOTAL) { + LoongCollectorMonitor::GetInstance()->SetAgentGoRoutinesTotal(stoi(metric.second)); + } + LogtailMonitor::GetInstance()->UpdateMetric(metric.first, metric.second); + } + } +} + } // namespace logtail diff --git a/core/monitor/MetricManager.h b/core/monitor/MetricManager.h index 1d54b1cfce..e87a022da1 100644 --- a/core/monitor/MetricManager.h +++ b/core/monitor/MetricManager.h @@ -24,10 +24,55 @@ #include "MetricRecord.h" #include "common/Lock.h" +#include "models/PipelineEventGroup.h" #include "protobuf/sls/sls_logs.pb.h" namespace logtail { +extern const std::string METRIC_TOPIC_TYPE; + +struct SelfMonitorMetricRule { + bool mEnable; + size_t mInterval; +}; + +struct SelfMonitorMetricRules { + SelfMonitorMetricRule mAgentMetricsRule; + SelfMonitorMetricRule mRunnerMetricsRule; + SelfMonitorMetricRule mPipelineMetricsRule; + SelfMonitorMetricRule mPluginSourceMetricsRule; + SelfMonitorMetricRule mPluginMetricsRule; + SelfMonitorMetricRule mComponentMetricsRule; +}; + +using SelfMonitorMetricEventKey = int64_t; +class SelfMonitorMetricEvent { +public: + SelfMonitorMetricEvent(); + SelfMonitorMetricEvent(MetricsRecord* metricRecord); + SelfMonitorMetricEvent(const std::map& metricRecord); + + void SetInterval(size_t interval); + void Merge(SelfMonitorMetricEvent& event); + + bool ShouldSend(); + bool ShouldDelete(); + void ReadAsMetricEvent(MetricEvent* metricEventPtr); + + SelfMonitorMetricEventKey mKey; // labels + category + std::string mCategory; // category +private: + void CreateKey(); + + std::unordered_map mLabels; + std::unordered_map mCounters; + std::unordered_map mGauges; + int32_t mSendInterval; + int32_t mLastSendInterval; + bool mUpdatedFlag; +}; +using SelfMonitorMetricEventMap = std::unordered_map; + class WriteMetrics { private: WriteMetrics() = default; @@ -66,8 +111,10 @@ class ReadMetrics { ReadMetrics() = default; mutable ReadWriteLock mReadWriteLock; MetricsRecord* mHead = nullptr; + std::vector> mGoMetrics; void Clear(); MetricsRecord* GetHead(); + void UpdateGoCppProvidedMetrics(std::vector>& metricsList); public: ~ReadMetrics(); @@ -75,10 +122,7 @@ class ReadMetrics { static ReadMetrics* ptr = new ReadMetrics(); return ptr; } - void ReadAsLogGroup(const std::string& regionFieldName, - const std::string& defaultRegion, - std::map& logGroupMap) const; - void ReadAsFileBuffer(std::string& metricsContent) const; + void ReadAsSelfMonitorMetricEvents(std::vector& metricEventList) const; void UpdateMetrics(); #ifdef APSARA_UNIT_TEST_MAIN diff --git a/core/monitor/MetricRecord.cpp b/core/monitor/MetricRecord.cpp index 8e27533db5..cf8ca66d24 100644 --- a/core/monitor/MetricRecord.cpp +++ b/core/monitor/MetricRecord.cpp @@ -18,9 +18,6 @@ namespace logtail { -const std::string METRIC_KEY_LABEL = "label"; -const std::string METRIC_KEY_VALUE = "value"; -const std::string METRIC_KEY_CATEGORY = "category"; const std::string MetricCategory::METRIC_CATEGORY_UNKNOWN = "unknown"; const std::string MetricCategory::METRIC_CATEGORY_AGENT = "agent"; const std::string MetricCategory::METRIC_CATEGORY_RUNNER = "runner"; diff --git a/core/monitor/MetricRecord.h b/core/monitor/MetricRecord.h index 487be8a65c..4a6e435eea 100644 --- a/core/monitor/MetricRecord.h +++ b/core/monitor/MetricRecord.h @@ -19,9 +19,6 @@ namespace logtail { -extern const std::string METRIC_KEY_LABEL; -extern const std::string METRIC_KEY_VALUE; -extern const std::string METRIC_KEY_CATEGORY; class MetricCategory { public: static const std::string METRIC_CATEGORY_UNKNOWN; diff --git a/core/monitor/Monitor.cpp b/core/monitor/Monitor.cpp index 0123fee786..21a51952de 100644 --- a/core/monitor/Monitor.cpp +++ b/core/monitor/Monitor.cpp @@ -37,7 +37,7 @@ #include "go_pipeline/LogtailPlugin.h" #include "logger/Logger.h" #include "monitor/AlarmManager.h" -#include "monitor/MetricExportor.h" +#include "monitor/SelfMonitorServer.h" #include "plugin/flusher/sls/FlusherSLS.h" #include "protobuf/sls/sls_logs.pb.h" #include "runner/FlusherRunner.h" @@ -721,6 +721,9 @@ LoongCollectorMonitor::~LoongCollectorMonitor() { } void LoongCollectorMonitor::Init() { + LOG_INFO(sLogger, ("LoongCollector monitor", "started")); + SelfMonitorServer::GetInstance()->Init(); + // create metric record MetricLabels labels; labels.emplace_back(METRIC_LABEL_KEY_INSTANCE_ID, Application::GetInstance()->GetInstanceId()); @@ -751,7 +754,56 @@ void LoongCollectorMonitor::Init() { } void LoongCollectorMonitor::Stop() { - MetricExportor::GetInstance()->PushMetrics(true); + SelfMonitorServer::GetInstance()->Stop(); + LOG_INFO(sLogger, ("LoongCollector monitor", "stopped successfully")); + +} + +const string LoongCollectorMonitor::GetInnerSelfMonitorMetricPipeline() { +#ifdef __ENTERPRISE__ + static string pipeline = ""; +#else + static string pipeline = R"( + { + "inputs": [ + { + "Type": "input_internal_metrics", + "Agent": { + "Enable": false, + "Interval": 1 + }, + "Runner": { + "Enable": false, + "Interval": 1 + }, + "Pipeline": { + "Enable": true, + "Interval": 1 + }, + "PluginSource": { + "Enable": true, + "Interval": 10 + }, + "Plugin": { + "Enable": false, + "Interval": 10 + }, + "Component": { + "Enable": false, + "Interval": 10 + } + } + ], + "flushers": [ + { + "Type": "flusher_file", + "FilePath": "./log/self_metrics.log" + } + ] + } + )"; +#endif + return pipeline; } } // namespace logtail diff --git a/core/monitor/Monitor.h b/core/monitor/Monitor.h index c66a47218e..372407d8f1 100644 --- a/core/monitor/Monitor.h +++ b/core/monitor/Monitor.h @@ -192,6 +192,11 @@ class LoongCollectorMonitor { void Init(); void Stop(); + static const std::string GetInnerSelfMonitorAlarmPipelineName() { return ""; } + static const std::string GetInnerSelfMonitorAlarmPipeline() { return ""; } + static const std::string GetInnerSelfMonitorMetricPipelineName() { return "inner-self-monitor-metric-pipeline"; } + static const std::string GetInnerSelfMonitorMetricPipeline(); + void SetAgentCpu(double cpu) { mAgentCpu->Set(cpu); } void SetAgentMemory(uint64_t mem) { mAgentMemory->Set(mem); } void SetAgentGoMemory(uint64_t mem) { mAgentGoMemory->Set(mem); } diff --git a/core/monitor/PluginMetricManager.cpp b/core/monitor/PluginMetricManager.cpp index 25a6ff41a1..58719241d9 100644 --- a/core/monitor/PluginMetricManager.cpp +++ b/core/monitor/PluginMetricManager.cpp @@ -31,6 +31,7 @@ void ReentrantMetricsRecord::Init(const std::string& category, break; case MetricType::METRIC_TYPE_TIME_COUNTER: mTimeCounters[metric.first] = mMetricsRecordRef.CreateTimeCounter(metric.first); + break; case MetricType::METRIC_TYPE_INT_GAUGE: mIntGauges[metric.first] = mMetricsRecordRef.CreateIntGauge(metric.first); break; diff --git a/core/monitor/SelfMonitorServer.cpp b/core/monitor/SelfMonitorServer.cpp new file mode 100644 index 0000000000..690c0beca5 --- /dev/null +++ b/core/monitor/SelfMonitorServer.cpp @@ -0,0 +1,177 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "SelfMonitorServer.h" + +#include "Monitor.h" +#include "PipelineManager.h" +#include "common/LogtailCommonFlags.h" +#include "runner/ProcessorRunner.h" + +using namespace std; + +namespace logtail { + +SelfMonitorServer::SelfMonitorServer() { +} + +SelfMonitorServer* SelfMonitorServer::GetInstance() { + static SelfMonitorServer* ptr = new SelfMonitorServer(); + return ptr; +} + +void SelfMonitorServer::Init() { + mThreadRes = async(launch::async, &SelfMonitorServer::Monitor, this); +} + +void SelfMonitorServer::Monitor() { + LOG_INFO(sLogger, ("self-monitor", "started")); + int32_t lastMonitorTime = time(NULL); + { + unique_lock lock(mThreadRunningMux); + while (mIsThreadRunning) { + if (mStopCV.wait_for(lock, std::chrono::seconds(1), [this]() { return !mIsThreadRunning; })) { + break; + } + int32_t monitorTime = time(NULL); + if ((monitorTime - lastMonitorTime) >= 60) { // 60s + lastMonitorTime = monitorTime; + SendMetrics(); + SendAlarms(); + } + } + } + SendMetrics(); + SendAlarms(); +} + +void SelfMonitorServer::Stop() { + { + lock_guard lock(mThreadRunningMux); + mIsThreadRunning = false; + } + mStopCV.notify_one(); + future_status s = mThreadRes.wait_for(chrono::seconds(1)); + if (s == future_status::ready) { + LOG_INFO(sLogger, ("self-monitor", "stopped successfully")); + } else { + LOG_WARNING(sLogger, ("self-monitor", "forced to stopped")); + } +} + +void SelfMonitorServer::UpdateMetricPipeline(PipelineContext* ctx, SelfMonitorMetricRules* rules) { + WriteLock lock(mMetricPipelineLock); + mMetricPipelineCtx = ctx; + mSelfMonitorMetricRules = rules; + LOG_INFO(sLogger, ("self-monitor metrics pipeline", "updated")); +} + +void SelfMonitorServer::RemoveMetricPipeline() { + WriteLock lock(mMetricPipelineLock); + mMetricPipelineCtx = nullptr; + mSelfMonitorMetricRules = nullptr; + LOG_INFO(sLogger, ("self-monitor metrics pipeline", "removed")); +} + +void SelfMonitorServer::SendMetrics() { + ReadMetrics::GetInstance()->UpdateMetrics(); + + ReadLock lock(mMetricPipelineLock); + if (mMetricPipelineCtx == nullptr || mSelfMonitorMetricRules == nullptr) { + return; + } + // new pipeline + vector metricEventList; + ReadMetrics::GetInstance()->ReadAsSelfMonitorMetricEvents(metricEventList); + PushSelfMonitorMetricEvents(metricEventList); + + PipelineEventGroup pipelineEventGroup(std::make_shared()); + pipelineEventGroup.SetTagNoCopy(LOG_RESERVED_KEY_SOURCE, LoongCollectorMonitor::mIpAddr); + pipelineEventGroup.SetTagNoCopy(LOG_RESERVED_KEY_TOPIC, METRIC_TOPIC_TYPE); + ReadAsPipelineEventGroup(pipelineEventGroup); + + shared_ptr pipeline + = PipelineManager::GetInstance()->FindConfigByName(mMetricPipelineCtx->GetConfigName()); + if (pipeline.get() != nullptr) { + if (pipelineEventGroup.GetEvents().size() > 0) { + ProcessorRunner::GetInstance()->PushQueue( + pipeline->GetContext().GetProcessQueueKey(), 0, std::move(pipelineEventGroup)); + } + } +} + +bool SelfMonitorServer::ProcessSelfMonitorMetricEvent(SelfMonitorMetricEvent& event, + const SelfMonitorMetricRule& rule) { + if (!rule.mEnable) { + if (mSelfMonitorMetricEventMap.find(event.mKey) != mSelfMonitorMetricEventMap.end()) { + mSelfMonitorMetricEventMap.erase(event.mKey); + } + return false; + } + event.SetInterval(rule.mInterval); + return true; +} + +void SelfMonitorServer::PushSelfMonitorMetricEvents(std::vector& events) { + for (auto event : events) { + bool shouldSkip = false; + if (event.mCategory == MetricCategory::METRIC_CATEGORY_AGENT) { + shouldSkip = !ProcessSelfMonitorMetricEvent(event, mSelfMonitorMetricRules->mAgentMetricsRule); + } else if (event.mCategory == MetricCategory::METRIC_CATEGORY_RUNNER) { + shouldSkip = !ProcessSelfMonitorMetricEvent(event, mSelfMonitorMetricRules->mRunnerMetricsRule); + } else if (event.mCategory == MetricCategory::METRIC_CATEGORY_COMPONENT) { + shouldSkip = !ProcessSelfMonitorMetricEvent(event, mSelfMonitorMetricRules->mComponentMetricsRule); + } else if (event.mCategory == MetricCategory::METRIC_CATEGORY_PIPELINE) { + shouldSkip = !ProcessSelfMonitorMetricEvent(event, mSelfMonitorMetricRules->mPipelineMetricsRule); + } else if (event.mCategory == MetricCategory::METRIC_CATEGORY_PLUGIN) { + shouldSkip = !ProcessSelfMonitorMetricEvent(event, mSelfMonitorMetricRules->mPluginMetricsRule); + } else if (event.mCategory == MetricCategory::METRIC_CATEGORY_PLUGIN_SOURCE) { + shouldSkip = !ProcessSelfMonitorMetricEvent(event, mSelfMonitorMetricRules->mPluginSourceMetricsRule); + } + if (shouldSkip) + continue; + + if (mSelfMonitorMetricEventMap.find(event.mKey) != mSelfMonitorMetricEventMap.end()) { + mSelfMonitorMetricEventMap[event.mKey].Merge(event); + } else { + mSelfMonitorMetricEventMap[event.mKey] = std::move(event); + } + } +} + +void SelfMonitorServer::ReadAsPipelineEventGroup(PipelineEventGroup& pipelineEventGroup) { + for (auto event = mSelfMonitorMetricEventMap.begin(); event != mSelfMonitorMetricEventMap.end();) { + if (event->second.ShouldSend()) { + MetricEvent* metricEventPtr = pipelineEventGroup.AddMetricEvent(); + event->second.ReadAsMetricEvent(metricEventPtr); + } + if (event->second.ShouldDelete()) { + event = mSelfMonitorMetricEventMap.erase(event); + } else { + event++; + } + } +} + +void SelfMonitorServer::UpdateAlarmPipeline(PipelineContext* ctx) { + lock_guard lock(mAlarmPipelineMux); + mAlarmPipelineCtx = ctx; +} + +void SelfMonitorServer::SendAlarms() { +} + +} // namespace logtail \ No newline at end of file diff --git a/core/monitor/SelfMonitorServer.h b/core/monitor/SelfMonitorServer.h new file mode 100644 index 0000000000..cea3448b0c --- /dev/null +++ b/core/monitor/SelfMonitorServer.h @@ -0,0 +1,61 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "Pipeline.h" + +namespace logtail { + +class SelfMonitorServer { +public: + SelfMonitorServer(const SelfMonitorServer&) = delete; + SelfMonitorServer& operator=(const SelfMonitorServer&) = delete; + static SelfMonitorServer* GetInstance(); + + void Init(); + void Monitor(); + void Stop(); + + void UpdateMetricPipeline(PipelineContext* ctx, SelfMonitorMetricRules* rules); + void RemoveMetricPipeline(); + void UpdateAlarmPipeline(PipelineContext* ctx); // Todo +private: + SelfMonitorServer(); + ~SelfMonitorServer() = default; + + std::future mThreadRes; + std::mutex mThreadRunningMux; + bool mIsThreadRunning = true; + std::condition_variable mStopCV; + + void SendMetrics(); + bool ProcessSelfMonitorMetricEvent(SelfMonitorMetricEvent& event, const SelfMonitorMetricRule& rule); + void PushSelfMonitorMetricEvents(std::vector& events); + void ReadAsPipelineEventGroup(PipelineEventGroup& pipelineEventGroup); + + PipelineContext* mMetricPipelineCtx = nullptr; + SelfMonitorMetricRules* mSelfMonitorMetricRules = nullptr; + SelfMonitorMetricEventMap mSelfMonitorMetricEventMap; + mutable ReadWriteLock mMetricPipelineLock; + + void SendAlarms(); + + PipelineContext* mAlarmPipelineCtx; + std::mutex mAlarmPipelineMux; +}; + +} // namespace logtail \ No newline at end of file diff --git a/core/pipeline/plugin/PluginRegistry.cpp b/core/pipeline/plugin/PluginRegistry.cpp index 9e684c14a6..bf0d7acbe6 100644 --- a/core/pipeline/plugin/PluginRegistry.cpp +++ b/core/pipeline/plugin/PluginRegistry.cpp @@ -26,12 +26,14 @@ #include "app_config/AppConfig.h" #include "common/Flags.h" #include "plugin/flusher/blackhole/FlusherBlackHole.h" +#include "plugin/flusher/file/FlusherFile.h" #include "plugin/flusher/sls/FlusherSLS.h" #include "plugin/input/InputContainerStdio.h" #include "plugin/input/InputFile.h" #include "plugin/input/InputPrometheus.h" #if defined(__linux__) && !defined(__ANDROID__) #include "plugin/input/InputFileSecurity.h" +#include "plugin/input/InputInternalMetrics.h" #include "plugin/input/InputNetworkObserver.h" #include "plugin/input/InputNetworkSecurity.h" #include "plugin/input/InputProcessSecurity.h" @@ -49,9 +51,9 @@ #include "plugin/processor/ProcessorParseJsonNative.h" #include "plugin/processor/ProcessorParseRegexNative.h" #include "plugin/processor/ProcessorParseTimestampNative.h" -#include "plugin/processor/inner/ProcessorPromParseMetricNative.h" #include "plugin/processor/inner/ProcessorMergeMultilineLogNative.h" #include "plugin/processor/inner/ProcessorParseContainerLogNative.h" +#include "plugin/processor/inner/ProcessorPromParseMetricNative.h" #include "plugin/processor/inner/ProcessorPromRelabelMetricNative.h" #include "plugin/processor/inner/ProcessorSplitLogStringNative.h" #include "plugin/processor/inner/ProcessorSplitMultilineLogStringNative.h" @@ -125,6 +127,7 @@ bool PluginRegistry::IsValidNativeFlusherPlugin(const string& name) const { void PluginRegistry::LoadStaticPlugins() { RegisterInputCreator(new StaticInputCreator()); RegisterInputCreator(new StaticInputCreator()); + RegisterInputCreator(new StaticInputCreator()); #if defined(__linux__) && !defined(__ANDROID__) RegisterInputCreator(new StaticInputCreator()); RegisterInputCreator(new StaticInputCreator()); @@ -156,6 +159,7 @@ void PluginRegistry::LoadStaticPlugins() { RegisterFlusherCreator(new StaticFlusherCreator()); RegisterFlusherCreator(new StaticFlusherCreator()); + RegisterFlusherCreator(new StaticFlusherCreator()); } void PluginRegistry::LoadDynamicPlugins(const set& plugins) { @@ -220,7 +224,8 @@ void PluginRegistry::RegisterCreator(PluginCat cat, PluginCreator* creator) { mPluginDict.emplace(PluginKey(cat, creator->Name()), shared_ptr(creator)); } -unique_ptr PluginRegistry::Create(PluginCat cat, const string& name, const PluginInstance::PluginMeta& pluginMeta) { +unique_ptr +PluginRegistry::Create(PluginCat cat, const string& name, const PluginInstance::PluginMeta& pluginMeta) { unique_ptr ins; auto creatorEntry = mPluginDict.find(PluginKey(cat, name)); if (creatorEntry != mPluginDict.end()) { diff --git a/core/pipeline/serializer/JsonSerializer.cpp b/core/pipeline/serializer/JsonSerializer.cpp new file mode 100644 index 0000000000..c83b45e282 --- /dev/null +++ b/core/pipeline/serializer/JsonSerializer.cpp @@ -0,0 +1,114 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "pipeline/serializer/JsonSerializer.h" + +#include "constants/SpanConstants.h" +#include "protobuf/sls/LogGroupSerializer.h" + +using namespace std; + +namespace logtail { + +const string JSON_KEY_TIME = "__time__"; + +bool JsonEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, string& errorMsg) { + if (group.mEvents.empty()) { + errorMsg = "empty event group"; + return false; + } + + PipelineEvent::Type eventType = group.mEvents[0]->GetType(); + if (eventType == PipelineEvent::Type::NONE) { + // should not happen + errorMsg = "unsupported event type in event group"; + return false; + } + + Json::Value groupTags; + for (const auto& tag : group.mTags.mInner) { + groupTags[tag.first.to_string()] = tag.second.to_string(); + } + + std::ostringstream oss; + switch (eventType) { + case PipelineEvent::Type::LOG: + for (size_t i = 0; i < group.mEvents.size(); ++i) { + const auto& e = group.mEvents[i].Cast(); + Json::Value eventJson; + // tags + eventJson.copy(groupTags); + // time + eventJson[JSON_KEY_TIME] = e.GetTimestamp(); + // contents + for (const auto& kv : e) { + eventJson[kv.first.to_string()] = kv.second.to_string(); + } + Json::StreamWriterBuilder writer; + writer["indentation"] = ""; + oss << Json::writeString(writer, eventJson); + } + break; + case PipelineEvent::Type::METRIC: + for (size_t i = 0; i < group.mEvents.size(); ++i) { + const auto& e = group.mEvents[i].Cast(); + if (e.Is()) { + continue; + } + Json::Value eventJson; + // tags + eventJson.copy(groupTags); + // time + eventJson[JSON_KEY_TIME] = e.GetTimestamp(); + // __labels__ + eventJson[METRIC_RESERVED_KEY_LABELS] = Json::Value(); + for (auto tag = e.TagsBegin(); tag != e.TagsEnd(); tag++) { + eventJson[METRIC_RESERVED_KEY_LABELS][tag->first.to_string()] = tag->second.to_string(); + } + // __name__ + eventJson[METRIC_RESERVED_KEY_NAME] = e.GetName().to_string(); + // __value__ + if (e.Is()) { + eventJson[METRIC_RESERVED_KEY_VALUE] = e.GetValue()->mValue; + } else if (e.Is()) { + eventJson[METRIC_RESERVED_KEY_VALUE] = Json::Value(); + for (auto value = e.GetValue()->ValusBegin(); + value != e.GetValue()->ValusEnd(); + value++) { + eventJson[METRIC_RESERVED_KEY_VALUE][value->first.to_string()] = value->second; + } + } + Json::StreamWriterBuilder writer; + writer["indentation"] = ""; + oss << Json::writeString(writer, eventJson); + } + break; + case PipelineEvent::Type::SPAN: + LOG_ERROR( + sLogger, + ("invalid event type", "span type is not supported")("config", mFlusher->GetContext().GetConfigName())); + break; + case PipelineEvent::Type::RAW: + LOG_ERROR( + sLogger, + ("invalid event type", "raw type is not supported")("config", mFlusher->GetContext().GetConfigName())); + break; + default: + break; + } + res = oss.str(); + return true; +} + +} // namespace logtail \ No newline at end of file diff --git a/core/pipeline/serializer/JsonSerializer.h b/core/pipeline/serializer/JsonSerializer.h new file mode 100644 index 0000000000..7576af70fc --- /dev/null +++ b/core/pipeline/serializer/JsonSerializer.h @@ -0,0 +1,34 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "pipeline/serializer/Serializer.h" + +namespace logtail { + +class JsonEventGroupSerializer : public Serializer { +public: + JsonEventGroupSerializer(Flusher* f) : Serializer(f) {} + +private: + bool Serialize(BatchedEvents&& p, std::string& res, std::string& errorMsg) override; +}; + +} // namespace logtail diff --git a/core/pipeline/serializer/SLSSerializer.cpp b/core/pipeline/serializer/SLSSerializer.cpp index 6b9ec3888d..177ddf4fa7 100644 --- a/core/pipeline/serializer/SLSSerializer.cpp +++ b/core/pipeline/serializer/SLSSerializer.cpp @@ -14,13 +14,15 @@ #include "pipeline/serializer/SLSSerializer.h" +#include + +#include + #include "common/Flags.h" -#include "constants/SpanConstants.h" #include "common/compression/CompressType.h" +#include "constants/SpanConstants.h" #include "plugin/flusher/sls/FlusherSLS.h" #include "protobuf/sls/LogGroupSerializer.h" -#include -#include DECLARE_FLAG_INT32(max_send_log_group_size); @@ -97,7 +99,7 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri vector> spanEventContentCache(group.mEvents.size()); size_t logGroupSZ = 0; switch (eventType) { - case PipelineEvent::Type::LOG:{ + case PipelineEvent::Type::LOG: { for (size_t i = 0; i < group.mEvents.size(); ++i) { const auto& e = group.mEvents[i].Cast(); if (e.Empty()) { @@ -111,7 +113,7 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri } break; } - case PipelineEvent::Type::METRIC:{ + case PipelineEvent::Type::METRIC: { for (size_t i = 0; i < group.mEvents.size(); ++i) { const auto& e = group.mEvents[i].Cast(); if (e.Is()) { @@ -145,9 +147,10 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_PARENT_ID.size(), e.GetParentSpanId().size()); contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_SPAN_NAME.size(), e.GetName().size()); contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_SPAN_KIND.size(), GetKindString(e.GetKind()).size()); - contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_STATUS_CODE.size(), GetStatusString(e.GetStatus()).size()); + contentSZ + += GetLogContentSize(DEFAULT_TRACE_TAG_STATUS_CODE.size(), GetStatusString(e.GetStatus()).size()); contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_TRACE_STATE.size(), e.GetTraceState().size()); - // + // set tags and scope tags Json::Value jsonVal; for (auto it = e.TagsBegin(); it != e.TagsEnd(); ++it) { @@ -160,7 +163,6 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri std::string attrString = Json::writeString(writer, jsonVal); contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_ATTRIBUTES.size(), attrString.size()); spanEventContentCache[i][0] = std::move(attrString); - auto linkString = SerializeSpanLinksToString(e); contentSZ += GetLogContentSize(DEFAULT_TRACE_TAG_LINKS.size(), linkString.size()); spanEventContentCache[i][1] = std::move(linkString); @@ -265,7 +267,7 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri serializer.AddLogContent(DEFAULT_TRACE_TAG_TRACE_STATE, spanEvent.GetTraceState()); serializer.AddLogContent(DEFAULT_TRACE_TAG_ATTRIBUTES, spanEventContentCache[i][0]); - + serializer.AddLogContent(DEFAULT_TRACE_TAG_LINKS, spanEventContentCache[i][1]); serializer.AddLogContent(DEFAULT_TRACE_TAG_EVENTS, spanEventContentCache[i][2]); @@ -275,7 +277,6 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri serializer.AddLogContent(DEFAULT_TRACE_TAG_END_TIME_NANO, spanEventContentCache[i][4]); // duration serializer.AddLogContent(DEFAULT_TRACE_TAG_DURATION, spanEventContentCache[i][5]); - } break; case PipelineEvent::Type::RAW: diff --git a/core/plugin/flusher/file/FlusherFile.cpp b/core/plugin/flusher/file/FlusherFile.cpp new file mode 100644 index 0000000000..73eb646547 --- /dev/null +++ b/core/plugin/flusher/file/FlusherFile.cpp @@ -0,0 +1,126 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "plugin/flusher/file/FlusherFile.h" + +#include +#include +#include + +#include "pipeline/queue/SenderQueueManager.h" + +using namespace std; + +namespace logtail { + +const string FlusherFile::sName = "flusher_file"; + +bool FlusherFile::Init(const Json::Value& config, Json::Value& optionalGoPipeline) { + static uint32_t cnt = 0; + GenerateQueueKey(to_string(++cnt)); + SenderQueueManager::GetInstance()->CreateQueue(mQueueKey, mPluginID, *mContext); + + string errorMsg; + // FilePath + if (!GetMandatoryStringParam(config, "FilePath", mFilePath, errorMsg)) { + PARAM_ERROR_RETURN(mContext->GetLogger(), + mContext->GetAlarm(), + errorMsg, + sName, + mContext->GetConfigName(), + mContext->GetProjectName(), + mContext->GetLogstoreName(), + mContext->GetRegion()); + } + // Pattern + // GetMandatoryStringParam(config, "Pattern", mPattern, errorMsg); + // MaxFileSize + // GetMandatoryUIntParam(config, "MaxFileSize", mMaxFileSize, errorMsg); + // MaxFiles + // GetMandatoryUIntParam(config, "MaxFiles", mMaxFileSize, errorMsg); + + // create file writer + auto file_sink = std::make_shared(mFilePath, mMaxFileSize, mMaxFiles, true); + mFileWriter = std::make_shared( + sName, file_sink, spdlog::thread_pool(), spdlog::async_overflow_policy::block); + mFileWriter->set_pattern(mPattern); + + mBatcher.Init(Json::Value(), this, DefaultFlushStrategyOptions{}); + mGroupSerializer = make_unique(this); + mSendCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_OUT_EVENT_GROUPS_TOTAL); + return true; +} + +bool FlusherFile::Send(PipelineEventGroup&& g) { + if (g.IsReplay()) { + return SerializeAndPush(std::move(g)); + } else { + vector res; + mBatcher.Add(std::move(g), res); + return SerializeAndPush(std::move(res)); + } +} + +bool FlusherFile::Flush(size_t key) { + BatchedEventsList res; + mBatcher.FlushQueue(key, res); + return SerializeAndPush(std::move(res)); +} + +bool FlusherFile::FlushAll() { + vector res; + mBatcher.FlushAll(res); + return SerializeAndPush(std::move(res)); +} + +bool FlusherFile::SerializeAndPush(PipelineEventGroup&& group) { + string serializedData, errorMsg; + BatchedEvents g(std::move(group.MutableEvents()), + std::move(group.GetSizedTags()), + std::move(group.GetSourceBuffer()), + group.GetMetadata(EventGroupMetaKey::SOURCE_ID), + std::move(group.GetExactlyOnceCheckpoint())); + mGroupSerializer->DoSerialize(move(g), serializedData, errorMsg); + if (errorMsg.empty()) { + mFileWriter->info(serializedData); + } else { + LOG_ERROR(sLogger, ("serialize pipeline event group error", errorMsg)); + } + mFileWriter->flush(); + return true; +} + +bool FlusherFile::SerializeAndPush(BatchedEventsList&& groupList) { + string serializedData; + for (auto& group : groupList) { + string errorMsg; + mGroupSerializer->DoSerialize(move(group), serializedData, errorMsg); + if (errorMsg.empty()) { + mFileWriter->info(serializedData); + } else { + LOG_ERROR(sLogger, ("serialize pipeline event group error", errorMsg)); + } + } + mFileWriter->flush(); + return true; +} + +bool FlusherFile::SerializeAndPush(vector&& groupLists) { + for (auto& groupList : groupLists) { + SerializeAndPush(std::move(groupList)); + } + return true; +} + +} // namespace logtail \ No newline at end of file diff --git a/core/plugin/flusher/file/FlusherFile.h b/core/plugin/flusher/file/FlusherFile.h new file mode 100644 index 0000000000..0f8e706ac5 --- /dev/null +++ b/core/plugin/flusher/file/FlusherFile.h @@ -0,0 +1,55 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include + +#include "pipeline/batch/Batcher.h" +#include "pipeline/plugin/interface/Flusher.h" +#include "pipeline/serializer/JsonSerializer.h" + +namespace logtail { + +class FlusherFile : public Flusher { +public: + static const std::string sName; + + const std::string& Name() const override { return sName; } + bool Init(const Json::Value& config, Json::Value& optionalGoPipeline) override; + bool Send(PipelineEventGroup&& g) override; + bool Flush(size_t key) override; + bool FlushAll() override; + +private: + bool SerializeAndPush(PipelineEventGroup&& group); + bool SerializeAndPush(BatchedEventsList&& groupList); + bool SerializeAndPush(std::vector&& groupLists); + + std::shared_ptr mFileWriter; + std::string mFilePath; + std::string mPattern = "%v"; + uint32_t mMaxFileSize = 1024 * 1024 * 10; + uint32_t mMaxFiles = 10; + Batcher mBatcher; + std::unique_ptr mGroupSerializer; + + CounterPtr mSendCnt; +}; + +} // namespace logtail diff --git a/core/plugin/input/InputInternalMetrics.cpp b/core/plugin/input/InputInternalMetrics.cpp new file mode 100644 index 0000000000..01f77c62b1 --- /dev/null +++ b/core/plugin/input/InputInternalMetrics.cpp @@ -0,0 +1,64 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "plugin/input/InputInternalMetrics.h" + +namespace logtail { + +const std::string InputInternalMetrics::sName = "input_internal_metrics"; + +bool GetEnabled(const Json::Value& rule) { + if (rule.isMember("Enable") && rule["Enable"].isBool()) + return rule["Enable"].asBool(); + return true; +} + +int GetInterval(const Json::Value& rule) { + if (rule.isMember("Interval") && rule["Interval"].isInt()) + return rule["Interval"].asInt(); + return 10; +} + +void ParseSelfMonitorMetricRule(std::string&& ruleKey, const Json::Value& ruleJson, SelfMonitorMetricRule& rule) { + if (ruleJson.isMember(ruleKey) && ruleJson[ruleKey].isObject()) { + rule.mEnable = GetEnabled(ruleJson[ruleKey]); + rule.mInterval = GetInterval(ruleJson[ruleKey]); + } +} + +bool InputInternalMetrics::Init(const Json::Value& config, Json::Value& optionalGoPipeline) { + ParseSelfMonitorMetricRule("Agent", config, mSelfMonitorMetricRules.mAgentMetricsRule); + ParseSelfMonitorMetricRule("Runner", config, mSelfMonitorMetricRules.mRunnerMetricsRule); + ParseSelfMonitorMetricRule("Pipeline", config, mSelfMonitorMetricRules.mPipelineMetricsRule); + ParseSelfMonitorMetricRule("PluginSource", config, mSelfMonitorMetricRules.mPluginSourceMetricsRule); + ParseSelfMonitorMetricRule("Plugin", config, mSelfMonitorMetricRules.mPluginMetricsRule); + ParseSelfMonitorMetricRule("Component", config, mSelfMonitorMetricRules.mComponentMetricsRule); + return true; +} + +bool InputInternalMetrics::Start() { + SelfMonitorServer::GetInstance()->UpdateMetricPipeline(mContext, &mSelfMonitorMetricRules); + return true; +} + +bool InputInternalMetrics::Stop(bool isPipelineRemoving) { + if (isPipelineRemoving) { + SelfMonitorServer::GetInstance()->RemoveMetricPipeline(); + } + return true; +} + +} // namespace logtail \ No newline at end of file diff --git a/core/plugin/input/InputInternalMetrics.h b/core/plugin/input/InputInternalMetrics.h new file mode 100644 index 0000000000..694edf85af --- /dev/null +++ b/core/plugin/input/InputInternalMetrics.h @@ -0,0 +1,37 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "monitor/SelfMonitorServer.h" +#include "pipeline/plugin/interface/Input.h" + +namespace logtail { + +class InputInternalMetrics : public Input { +public: + static const std::string sName; + + const std::string& Name() const override { return sName; } + bool Init(const Json::Value& config, Json::Value& optionalGoPipeline) override; + bool Start() override; + bool Stop(bool isPipelineRemoving) override; + bool SupportAck() const override { return true; } +private: + SelfMonitorMetricRules mSelfMonitorMetricRules; +}; + +} // namespace logtail \ No newline at end of file diff --git a/core/unittest/monitor/MetricManagerUnittest.cpp b/core/unittest/monitor/MetricManagerUnittest.cpp index 3625105227..343b6b9129 100644 --- a/core/unittest/monitor/MetricManagerUnittest.cpp +++ b/core/unittest/monitor/MetricManagerUnittest.cpp @@ -19,7 +19,6 @@ #include #include #include "MetricManager.h" -#include "MetricExportor.h" #include "MetricConstants.h" namespace logtail { @@ -63,7 +62,7 @@ void MetricManagerUnittest::TestCreateMetricAutoDelete() { fileCounter->Add(111UL); APSARA_TEST_EQUAL(fileCounter->GetValue(), 222); - MetricExportor::GetInstance()->PushMetrics(true); + ReadMetrics::GetInstance()->UpdateMetrics(); // assert WriteMetrics count MetricsRecord* tmp = WriteMetrics::GetInstance()->GetHead(); @@ -108,7 +107,7 @@ void MetricManagerUnittest::TestCreateMetricAutoDelete() { fileCounter3->Add(333UL); } - MetricExportor::GetInstance()->PushMetrics(true); + ReadMetrics::GetInstance()->UpdateMetrics(); // assert WriteMetrics count tmp = WriteMetrics::GetInstance()->GetHead(); @@ -130,13 +129,6 @@ void MetricManagerUnittest::TestCreateMetricAutoDelete() { APSARA_TEST_EQUAL(count, 1); } -void PushMetrics() { - for (int i = 0; i < 10; i++) { - LOG_INFO(sLogger, ("PushMetricsCount", i)); - MetricExportor::GetInstance()->PushMetrics(true); - } -} - void createMetrics(int count) { for (int i = 0; i < count; i++) { std::vector> labels; @@ -172,7 +164,7 @@ void MetricManagerUnittest::TestCreateMetricAutoDeleteMultiThread() { APSARA_TEST_EQUAL(count, 10); for (int i = 0; i < 10; i++) { - MetricExportor::GetInstance()->PushMetrics(true); + ReadMetrics::GetInstance()->UpdateMetrics(); } // assert WriteMetrics count @@ -256,7 +248,7 @@ void MetricManagerUnittest::TestCreateAndDeleteMetric() { delete fileMetric2; delete fileMetric3; - MetricExportor::GetInstance()->PushMetrics(true); + ReadMetrics::GetInstance()->UpdateMetrics(); // assert WriteMetrics count tmp = WriteMetrics::GetInstance()->GetHead(); @@ -306,7 +298,7 @@ void MetricManagerUnittest::TestCreateAndDeleteMetric() { APSARA_TEST_EQUAL(fileCounter->GetValue(), 333); - MetricExportor::GetInstance()->PushMetrics(true); + ReadMetrics::GetInstance()->UpdateMetrics(); // assert ReadMetrics count tmp = ReadMetrics::GetInstance()->GetHead(); count = 0; diff --git a/pkg/helper/self_metrics_v2_imp.go b/pkg/helper/self_metrics_v2_imp.go index b9803f61ab..9faf77b4b1 100644 --- a/pkg/helper/self_metrics_v2_imp.go +++ b/pkg/helper/self_metrics_v2_imp.go @@ -109,6 +109,10 @@ func (c *cumulativeCounterImp) Export() map[string]string { return c.Series.Export(metricValue.Name, strconv.FormatFloat(metricValue.Value, 'f', 4, 64)) } +func (c *cumulativeCounterImp) Type() pipeline.SelfMetricType { + return pipeline.CounterType +} + // delta is a counter metric that can be incremented or decremented. // It gets the increased value in the last window. type counterImp struct { @@ -146,6 +150,10 @@ func (c *counterImp) Export() map[string]string { return c.Series.Export(metricValue.Name, strconv.FormatFloat(metricValue.Value, 'f', 4, 64)) } +func (c *counterImp) Type() pipeline.SelfMetricType { + return pipeline.CounterType +} + // gauge is a metric that represents a single numerical value that can arbitrarily go up and down. type gaugeImp struct { value float64 @@ -181,6 +189,10 @@ func (g *gaugeImp) Export() map[string]string { return g.Series.Export(metricValue.Name, strconv.FormatFloat(metricValue.Value, 'f', 4, 64)) } +func (g *gaugeImp) Type() pipeline.SelfMetricType { + return pipeline.GaugeType +} + // averageImp is a metric to compute the average value of a series of values in the last window. // if there is no value added in the last window, the previous average value will be returned. type averageImp struct { @@ -234,6 +246,10 @@ func (a *averageImp) Export() map[string]string { return a.Series.Export(metricValue.Name, strconv.FormatFloat(metricValue.Value, 'f', 4, 64)) } +func (a *averageImp) Type() pipeline.SelfMetricType { + return pipeline.GaugeType +} + // maxImp is a metric to compute the max value of a series of values in the last window. // if there is no value added in the last window, zero will be returned. type maxImp struct { @@ -270,6 +286,10 @@ func (m *maxImp) Export() map[string]string { return m.Series.Export(metricValue.Name, strconv.FormatFloat(metricValue.Value, 'f', 4, 64)) } +func (m *maxImp) Type() pipeline.SelfMetricType { + return pipeline.GaugeType +} + // latencyImp is a metric to compute the average latency of a series of values in the last window. type latencyImp struct { sync.Mutex @@ -324,6 +344,10 @@ func (l *latencyImp) Export() map[string]string { return l.Series.Export(metricValue.Name, strconv.FormatFloat(metricValue.Value/1000, 'f', 4, 64)) // ns to us } +func (l *latencyImp) Type() pipeline.SelfMetricType { + return pipeline.GaugeType +} + // strMetricImp is a metric that represents a single string value. type strMetricImp struct { sync.RWMutex @@ -366,6 +390,10 @@ func (s *strMetricImp) Export() map[string]string { return s.Series.Export(metricValue.Name, metricValue.Value) } +func (s *strMetricImp) Type() pipeline.SelfMetricType { + return pipeline.GaugeType +} + type Series struct { pipeline.MetricSet labelValues []string @@ -440,9 +468,14 @@ func (e *errorNumericMetric) Export() map[string]string { return nil } +func (e *errorNumericMetric) Type() pipeline.SelfMetricType { + return pipeline.CounterType +} + func (e *errorNumericMetric) Collect() pipeline.MetricValue[float64] { return pipeline.MetricValue[float64]{Name: "", Value: 0} } + func (e *errorNumericMetric) Clear() {} func newErrorNumericMetric(err error) *errorNumericMetric { diff --git a/pkg/pipeline/context.go b/pkg/pipeline/context.go index fc3ee989b0..a32f079d25 100644 --- a/pkg/pipeline/context.go +++ b/pkg/pipeline/context.go @@ -16,6 +16,7 @@ package pipeline import ( "context" + "encoding/json" "sync" "github.com/alibaba/ilogtail/pkg/config" @@ -30,8 +31,9 @@ type CommonContext struct { type LabelPair = Label const SelfMetricNameKey = "__name__" -const MetricLabelPrefix = "label." -const MetricValuePrefix = "value." +const MetricLabelPrefix = "labels" +const MetricCounterPrefix = "counters" +const MetricGaugePrefix = "gauges" type MetricsRecord struct { Context Context @@ -42,9 +44,12 @@ type MetricsRecord struct { } func (m *MetricsRecord) insertLabels(record map[string]string) { + labels := map[string]string{} for _, label := range m.Labels { - record[MetricLabelPrefix+label.Key] = label.Value + labels[label.Key] = label.Value } + labelsStr, _ := json.Marshal(labels) + record[MetricLabelPrefix] = string(labelsStr) } func (m *MetricsRecord) RegisterMetricCollector(collector MetricCollector) { @@ -63,16 +68,26 @@ func (m *MetricsRecord) ExportMetricRecords() map[string]string { m.insertLabels(record) for _, metricCollector := range m.MetricCollectors { metrics := metricCollector.Collect() - + counters := map[string]string{} + gauges := map[string]string{} for _, metric := range metrics { - singleMetricRecord := metric.Export() - if len(singleMetricRecord) == 0 { + singleMetric := metric.Export() + if len(singleMetric) == 0 { continue } - valueName := singleMetricRecord[SelfMetricNameKey] - valueValue := singleMetricRecord[valueName] - record[MetricValuePrefix+valueName] = valueValue + valueName := singleMetric[SelfMetricNameKey] + valueValue := singleMetric[valueName] + if metric.Type() == CounterType { + counters[valueName] = valueValue + } + if metric.Type() == GaugeType { + gauges[valueName] = valueValue + } } + countersStr, _ := json.Marshal(counters) + record[MetricCounterPrefix] = string(countersStr) + gaugesStr, _ := json.Marshal(gauges) + record[MetricGaugePrefix] = string(gaugesStr) } return record } diff --git a/pkg/pipeline/self_metrics.go b/pkg/pipeline/self_metrics.go index ef84b046d5..08d2f2e940 100644 --- a/pkg/pipeline/self_metrics.go +++ b/pkg/pipeline/self_metrics.go @@ -71,6 +71,7 @@ type MetricVector[T Metric] interface { type Metric interface { // Export as a map[string]string Export() map[string]string + Type() SelfMetricType } // CounterMetric has three implementations: diff --git a/pluginmanager/metric_export.go b/pluginmanager/metric_export.go index 8fab36eb2d..96b62c1e18 100644 --- a/pluginmanager/metric_export.go +++ b/pluginmanager/metric_export.go @@ -41,12 +41,15 @@ func GetMetrics(metricType string) []map[string]string { // // []map[string]string{ // { -// "label.plugin_name": "processor_test", -// "value.proc_in_records_total": "100", +// "labels": "{\"category\": \"plugin\",\"plugin_type\":\"flusher_stdout\"}", +// "counters": "{\"proc_in_records_total\": \"100\"}" +// "gauges": "{}" // }, // { -// "label.plugin_name": "flusher_stdout", -// "value.flusher_in_records_total": "100", +// "labels": "{\"category\": \"runner\",\"runner_name\":\"k8s_meta\"}", +// "counters": "{\"proc_in_records_total\": \"100\"}", +// "gauges": "{\"cache_size\": \"100\"}" +// } // }, // } func GetGoDirectMetrics() []map[string]string { @@ -62,8 +65,8 @@ func GetGoDirectMetrics() []map[string]string { // // []map[string]string{ // { -// "agent_go_memory_used_mb": "100", -// "agent_go_routines_total": "20" +// "go_memory_used_mb": "100", +// "go_routines_total": "20" // } // } func GetGoCppProvidedMetrics() []map[string]string {