Skip to content

Commit

Permalink
Optimize the limiter code to meet better isolation and recovery scena…
Browse files Browse the repository at this point in the history
…rios (#1985)
  • Loading branch information
linrunqi08 authored Dec 27, 2024
1 parent 82aed45 commit 2dc7605
Show file tree
Hide file tree
Showing 14 changed files with 246 additions and 120 deletions.
25 changes: 25 additions & 0 deletions core/app_config/AppConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ DEFINE_FLAG_STRING(logtail_snapshot_dir, "snapshot dir on local disk", "snapshot
DEFINE_FLAG_STRING(logtail_profile_snapshot, "reader profile on local disk", "logtail_profile_snapshot");
DEFINE_FLAG_STRING(ilogtail_config_env_name, "config file path", "ALIYUN_LOGTAIL_CONFIG");


#if defined(__linux__)
DEFINE_FLAG_STRING(adhoc_check_point_file_dir, "", "/tmp/logtail_adhoc_checkpoint");
#elif defined(_MSC_VER)
Expand All @@ -194,6 +195,21 @@ DEFINE_FLAG_STRING(sls_observer_ebpf_host_path,
namespace logtail {
constexpr int32_t kDefaultMaxSendBytePerSec = 25 * 1024 * 1024; // the max send speed per sec, realtime thread


// 全局并发度保留余量百分比
const double GLOBAL_CONCURRENCY_FREE_PERCENTAGE_FOR_ONE_REGION = 0.5;
// 单地域并发度最小值
const int32_t MIN_SEND_REQUEST_CONCURRENCY = 15;
// 单地域并发度最大值
const int32_t MAX_SEND_REQUEST_CONCURRENCY = 80;
// 并发度统计数量&&时间间隔
const uint32_t CONCURRENCY_STATISTIC_THRESHOLD = 10;
const uint32_t CONCURRENCY_STATISTIC_INTERVAL_THRESHOLD_SECONDS = 3;
// 并发度不回退百分比阈值
const uint32_t NO_FALL_BACK_FAIL_PERCENTAGE = 10;
// 并发度慢回退百分比阈值
const uint32_t SLOW_FALL_BACK_FAIL_PERCENTAGE = 40;

std::string AppConfig::sLocalConfigDir = "local";
void CreateAgentDir() {
try {
Expand Down Expand Up @@ -1161,6 +1177,15 @@ void AppConfig::LoadResourceConf(const Json::Value& confJson) {
mBindInterface.clear();
LOG_INFO(sLogger, ("bind_interface", mBindInterface));
}

// mSendRequestConcurrency was limited
if (mSendRequestConcurrency < MIN_SEND_REQUEST_CONCURRENCY) {
mSendRequestConcurrency = MIN_SEND_REQUEST_CONCURRENCY;
}
if (mSendRequestConcurrency > MAX_SEND_REQUEST_CONCURRENCY) {
mSendRequestConcurrency = MAX_SEND_REQUEST_CONCURRENCY;
}
mSendRequestGlobalConcurrency = mSendRequestConcurrency * (1 + GLOBAL_CONCURRENCY_FREE_PERCENTAGE_FOR_ONE_REGION);
}

bool AppConfig::CheckAndResetProxyEnv() {
Expand Down
17 changes: 16 additions & 1 deletion core/app_config/AppConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@
namespace logtail {
extern const int32_t kDefaultMaxSendBytePerSec;

extern const double GLOBAL_CONCURRENCY_FREE_PERCENTAGE_FOR_ONE_REGION;
extern const int32_t MIN_SEND_REQUEST_CONCURRENCY;
extern const int32_t MAX_SEND_REQUEST_CONCURRENCY;
extern const uint32_t CONCURRENCY_STATISTIC_THRESHOLD;
extern const uint32_t CONCURRENCY_STATISTIC_INTERVAL_THRESHOLD_SECONDS;
extern const uint32_t NO_FALL_BACK_FAIL_PERCENTAGE;
extern const uint32_t SLOW_FALL_BACK_FAIL_PERCENTAGE;

void CreateAgentDir();

std::string GetAgentLogDir();
Expand Down Expand Up @@ -131,6 +139,7 @@ class AppConfig {
int32_t mNumOfBufferFile;
int32_t mLocalFileSize;
int32_t mSendRequestConcurrency;
int32_t mSendRequestGlobalConcurrency;
std::string mBufferFilePath;

// checkpoint
Expand Down Expand Up @@ -207,6 +216,8 @@ class AppConfig {

std::string mBindInterface;



// /**
// * @brief Load ConfigServer, DataServer and network interface
// *
Expand Down Expand Up @@ -434,8 +445,12 @@ class AppConfig {
int32_t GetLocalFileSize() const { return mLocalFileSize; }

const std::string& GetBufferFilePath() const { return mBufferFilePath; }

// 单地域并发度
int32_t GetSendRequestConcurrency() const { return mSendRequestConcurrency; }
// 全局并发度
int32_t GetSendRequestGlobalConcurrency() const { return mSendRequestGlobalConcurrency; }

double GetGlobalConcurrencyFreePercentageForOneRegion() const { return GLOBAL_CONCURRENCY_FREE_PERCENTAGE_FOR_ONE_REGION; }

int32_t GetProcessThreadCount() const { return mProcessThreadCount; }

Expand Down
92 changes: 61 additions & 31 deletions core/pipeline/limiter/ConcurrencyLimiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,12 @@
using namespace std;

namespace logtail {

#ifdef APSARA_UNIT_TEST_MAIN
uint32_t ConcurrencyLimiter::GetCurrentLimit() const {
lock_guard<mutex> lock(mLimiterMux);
return mCurrenctConcurrency;
}

uint32_t ConcurrencyLimiter::GetCurrentInterval() const {
lock_guard<mutex> lock(mLimiterMux);
return mRetryIntervalSecs;
}
void ConcurrencyLimiter::SetCurrentLimit(uint32_t limit) {
lock_guard<mutex> lock(mLimiterMux);
mCurrenctConcurrency = limit;
Expand All @@ -42,19 +37,15 @@ void ConcurrencyLimiter::SetInSendingCount(uint32_t count) {
uint32_t ConcurrencyLimiter::GetInSendingCount() const {
return mInSendingCnt.load();
}

uint32_t ConcurrencyLimiter::GetStatisticThreshold() const {
return CONCURRENCY_STATISTIC_THRESHOLD;
}

#endif

bool ConcurrencyLimiter::IsValidToPop() {
lock_guard<mutex> lock(mLimiterMux);
if (mCurrenctConcurrency == 0) {
auto curTime = std::chrono::system_clock::now();
if (chrono::duration_cast<chrono::seconds>(curTime - mLastCheckTime).count() > mRetryIntervalSecs) {
mLastCheckTime = curTime;
return true;
} else {
return false;
}
}
if (mCurrenctConcurrency > mInSendingCnt.load()) {
return true;
}
Expand All @@ -69,16 +60,20 @@ void ConcurrencyLimiter::OnSendDone() {
--mInSendingCnt;
}

void ConcurrencyLimiter::OnSuccess() {
void ConcurrencyLimiter::OnSuccess(std::chrono::system_clock::time_point currentTime) {
AdjustConcurrency(true, currentTime);
}

void ConcurrencyLimiter::OnFail(std::chrono::system_clock::time_point currentTime) {
AdjustConcurrency(false, currentTime);
}

void ConcurrencyLimiter::Increase() {
lock_guard<mutex> lock(mLimiterMux);
if (mCurrenctConcurrency <= 0) {
mRetryIntervalSecs = mMinRetryIntervalSecs;
LOG_INFO(sLogger, ("reset send retry interval, type", mDescription));
}
if (mCurrenctConcurrency != mMaxConcurrency) {
++mCurrenctConcurrency;
if (mCurrenctConcurrency == mMaxConcurrency) {
LOG_INFO(sLogger,
LOG_DEBUG(sLogger,
("increase send concurrency to maximum, type", mDescription)("concurrency", mCurrenctConcurrency));
} else {
LOG_DEBUG(sLogger,
Expand All @@ -88,22 +83,57 @@ void ConcurrencyLimiter::OnSuccess() {
}
}

void ConcurrencyLimiter::OnFail() {
void ConcurrencyLimiter::Decrease(double fallBackRatio) {
lock_guard<mutex> lock(mLimiterMux);
if (mCurrenctConcurrency != 0) {
if (mCurrenctConcurrency != mMinConcurrency) {
auto old = mCurrenctConcurrency;
mCurrenctConcurrency = static_cast<uint32_t>(mCurrenctConcurrency * mConcurrencyDownRatio);
LOG_INFO(sLogger, ("decrease send concurrency, type", mDescription)("from", old)("to", mCurrenctConcurrency));
mCurrenctConcurrency = std::max(static_cast<uint32_t>(mCurrenctConcurrency * fallBackRatio), mMinConcurrency);
LOG_DEBUG(sLogger, ("decrease send concurrency, type", mDescription)("from", old)("to", mCurrenctConcurrency));
} else {
if (mRetryIntervalSecs != mMaxRetryIntervalSecs) {
auto old = mRetryIntervalSecs;
mRetryIntervalSecs
= min(mMaxRetryIntervalSecs, static_cast<uint32_t>(mRetryIntervalSecs * mRetryIntervalUpRatio));
LOG_INFO(sLogger,
("increase send retry interval, type",
mDescription)("from", ToString(old) + "s")("to", ToString(mRetryIntervalSecs) + "s"));
if (mMinConcurrency == 0) {
mCurrenctConcurrency = 1;
LOG_INFO(sLogger, ("decrease send concurrency to min, type", mDescription)("to", mCurrenctConcurrency));
}
}
}


void ConcurrencyLimiter::AdjustConcurrency(bool success, std::chrono::system_clock::time_point currentTime) {
uint32_t failPercentage = 0;
bool finishStatistics = false;
{
lock_guard<mutex> lock(mStatisticsMux);
mStatisticsTotal ++;
if (!success) {
mStatisticsFailTotal ++;
}
if (mLastStatisticsTime == std::chrono::system_clock::time_point()) {
mLastStatisticsTime = currentTime;
}
if (mStatisticsTotal == CONCURRENCY_STATISTIC_THRESHOLD || chrono::duration_cast<chrono::seconds>(currentTime - mLastStatisticsTime).count() > CONCURRENCY_STATISTIC_INTERVAL_THRESHOLD_SECONDS) {
failPercentage = mStatisticsFailTotal*100/mStatisticsTotal;
LOG_DEBUG(sLogger,("AdjustConcurrency", mDescription)("mStatisticsFailTotal", mStatisticsFailTotal)("mStatisticsTotal", mStatisticsTotal));
mStatisticsTotal = 0;
mStatisticsFailTotal = 0;
mLastStatisticsTime = currentTime;
finishStatistics = true;
}
}
if (finishStatistics) {
if (failPercentage == 0) {
// 成功
Increase();
} else if (failPercentage <= NO_FALL_BACK_FAIL_PERCENTAGE) {
// 不调整
} else if (failPercentage <= SLOW_FALL_BACK_FAIL_PERCENTAGE) {
// 慢回退
Decrease(mConcurrencySlowFallBackRatio);
} else {
// 快速回退
Decrease(mConcurrencyFastFallBackRatio);
}
}
}


} // namespace logtail
42 changes: 23 additions & 19 deletions core/pipeline/limiter/ConcurrencyLimiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,31 @@
#include <mutex>
#include <string>

#include "app_config/AppConfig.h"
#include "monitor/metric_constants/MetricConstants.h"

namespace logtail {

class ConcurrencyLimiter {
public:
ConcurrencyLimiter(const std::string& description,
uint32_t maxConcurrency,
uint32_t maxRetryIntervalSecs = 3600,
uint32_t minRetryIntervalSecs = 30,
double retryIntervalUpRatio = 1.5,
double concurrencyDownRatio = 0.5)
uint32_t minConcurrency = 1,
double concurrencyFastFallBackRatio = 0.5,
double concurrencySlowFallBackRatio = 0.8)
: mDescription(description),
mMaxConcurrency(maxConcurrency),
mMinConcurrency(minConcurrency),
mCurrenctConcurrency(maxConcurrency),
mMaxRetryIntervalSecs(maxRetryIntervalSecs),
mMinRetryIntervalSecs(minRetryIntervalSecs),
mRetryIntervalSecs(minRetryIntervalSecs),
mRetryIntervalUpRatio(retryIntervalUpRatio),
mConcurrencyDownRatio(concurrencyDownRatio) {}
mConcurrencyFastFallBackRatio(concurrencyFastFallBackRatio),
mConcurrencySlowFallBackRatio(concurrencySlowFallBackRatio) {}

bool IsValidToPop();
void PostPop();
void OnSendDone();

void OnSuccess();
void OnFail();
void OnSuccess(std::chrono::system_clock::time_point currentTime);
void OnFail(std::chrono::system_clock::time_point currentTime);


static std::string GetLimiterMetricName(const std::string& limiter) {
if (limiter == "region") {
Expand All @@ -64,10 +62,10 @@ class ConcurrencyLimiter {
#ifdef APSARA_UNIT_TEST_MAIN

uint32_t GetCurrentLimit() const;
uint32_t GetCurrentInterval() const;
void SetCurrentLimit(uint32_t limit);
void SetInSendingCount(uint32_t count);
uint32_t GetInSendingCount() const;
uint32_t GetStatisticThreshold() const;

#endif

Expand All @@ -77,19 +75,25 @@ class ConcurrencyLimiter {
std::atomic_uint32_t mInSendingCnt = 0U;

uint32_t mMaxConcurrency = 0;
uint32_t mMinConcurrency = 0;

mutable std::mutex mLimiterMux;
uint32_t mCurrenctConcurrency = 0;

uint32_t mMaxRetryIntervalSecs = 0;
uint32_t mMinRetryIntervalSecs = 0;
double mConcurrencyFastFallBackRatio = 0.0;
double mConcurrencySlowFallBackRatio = 0.0;

uint32_t mRetryIntervalSecs = 0;
std::chrono::system_clock::time_point mLastCheckTime;

double mRetryIntervalUpRatio = 0.0;
double mConcurrencyDownRatio = 0.0;
mutable std::mutex mStatisticsMux;
std::chrono::system_clock::time_point mLastStatisticsTime;
uint32_t mStatisticsTotal = 0;
uint32_t mStatisticsFailTotal = 0;

void Increase();
void Decrease(double fallBackRatio);
void AdjustConcurrency(bool success, std::chrono::system_clock::time_point currentTime);

std::chrono::system_clock::time_point mLastCheckTime;
};

} // namespace logtail
7 changes: 0 additions & 7 deletions core/pipeline/queue/BoundedSenderQueueInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,6 @@ void BoundedSenderQueueInterface::SetConcurrencyLimiters(std::unordered_map<std:
}
}

void BoundedSenderQueueInterface::OnSendingSuccess() {
for (auto& limiter : mConcurrencyLimiters) {
if (limiter.first != nullptr) {
limiter.first->OnSuccess();
}
}
}

void BoundedSenderQueueInterface::DecreaseSendingCnt() {
for (auto& limiter : mConcurrencyLimiters) {
Expand Down
1 change: 0 additions & 1 deletion core/pipeline/queue/BoundedSenderQueueInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ class BoundedSenderQueueInterface : public BoundedQueueInterface<std::unique_ptr
virtual void GetAvailableItems(std::vector<SenderQueueItem*>& items, int32_t limit) = 0;

void DecreaseSendingCnt();
void OnSendingSuccess();
void SetRateLimiter(uint32_t maxRate);
void SetConcurrencyLimiters(std::unordered_map<std::string, std::shared_ptr<ConcurrencyLimiter>>&& concurrencyLimitersMap);
virtual void SetPipelineForItems(const std::shared_ptr<Pipeline>& p) const = 0;
Expand Down
Loading

0 comments on commit 2dc7605

Please sign in to comment.