Skip to content

Commit

Permalink
Merge remote-tracking branch 'alibaba/main' into dida
Browse files Browse the repository at this point in the history
# Conflicts:
#	CHANGELOG.md
  • Loading branch information
quzard committed Jul 28, 2023
2 parents 722fc99 + 4882126 commit b303c29
Show file tree
Hide file tree
Showing 111 changed files with 3,043 additions and 677 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/build-core.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ on:
branches:
- main
- 1.*

jobs:
CI:
runs-on: ${{ matrix.runner }}
timeout-minutes: 30
timeout-minutes: 60
strategy:
matrix:
go-version: [1.19]
Expand Down Expand Up @@ -62,6 +63,10 @@ jobs:
ENABLE_STATIC_LINK_CRT: ON
run: make core

- name: Unit Test
if: matrix.runner == 'ubuntu-latest'
run: make unittest_core

result:
runs-on: ubuntu-latest
timeout-minutes: 30
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/e2e-core.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
timeout-minutes: 30
strategy:
matrix:
go-version: [ 1.19 ]
go-version: [ 1.19.10 ]
runner: [ ubuntu ]
fail-fast: true
steps:
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
timeout-minutes: 60
strategy:
matrix:
go-version: [ 1.19 ]
go-version: [ 1.19.10 ]
runner: [ ubuntu ]
fail-fast: true
steps:
Expand Down Expand Up @@ -56,8 +56,8 @@ jobs:
- name: UnitTest E2e Engine
if: matrix.runner == 'ubuntu'
run: |
sudo go version
sudo make unittest_e2e_engine
go version
make unittest_e2e_engine
result:
runs-on: ubuntu-latest
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,7 @@ your changes, such as:

## [Unreleased]

- [public] [both] [updated] elasticsearch flusher new features: send batch request by bulk api and format index
- [public] [both] [fixed] elasticsearch flusher panic
- [public] [both] [doc] elasticsearch flusher config examples
- [public] [both] [added] add new plugin: input_command
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ unittest_plugin: clean import_plugins
mv ./plugins/input/prometheus/input_prometheus.go.bak ./plugins/input/prometheus/input_prometheus.go
rm -rf plugins/input/jmxfetch/test/

.PHONY: unittest_core
unittest_core:
./scripts/run_core_ut.sh

.PHONY: unittest_pluginmanager
unittest_pluginmanager: clean import_plugins
cp pkg/logtail/libPluginAdapter.so ./plugin_main
Expand Down
2 changes: 1 addition & 1 deletion config_server/service/manager/config/agent_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (c *ConfigManager) CheckConfigUpdatesWhenHeartbeat(req *proto.HeartBeatRequ
result.OldVersion = k.Version
result.NewVersion = config.Version
result.Context = config.Context
result.CheckStatus = proto.CheckStatus_DELETED
result.CheckStatus = proto.CheckStatus_MODIFIED
agentConfigs = append(agentConfigs, result)
}
delete(SelectedConfigs, k.Name)
Expand Down
50 changes: 36 additions & 14 deletions config_server/service/test/interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ func TestOperationsBetweenConfigAndAgentGroup(t *testing.T) {
// check
So(status, ShouldEqual, common.InternalServerError.Status)
So(res.ResponseId, ShouldEqual, fmt.Sprint(requestID))
So(res.Code, ShouldEqual, common.InternalServerError.Code)
So(res.Code, ShouldEqual, proto.RespCode_INTERNAL_SERVER_ERROR)
So(res.Message, ShouldEqual, fmt.Sprintf("Config %s was applied to some agent groups, cannot be deleted.", configName))

requestID++
Expand All @@ -625,7 +625,7 @@ func TestOperationsBetweenConfigAndAgentGroup(t *testing.T) {
// check
So(status, ShouldEqual, common.InternalServerError.Status)
So(res.ResponseId, ShouldEqual, fmt.Sprint(requestID))
So(res.Code, ShouldEqual, common.InternalServerError.Code)
So(res.Code, ShouldEqual, proto.RespCode_INTERNAL_SERVER_ERROR)
So(res.Message, ShouldEqual, fmt.Sprintf("Config %s was applied to some agent groups, cannot be deleted.", configName))

requestID++
Expand Down Expand Up @@ -737,20 +737,23 @@ func TestAgentSendMessage(t *testing.T) {
var requestID int

Convey("Test Agent send message.", t, func() {
configInfos := make([]*proto.ConfigCheckResult, 0)

fmt.Print("\n\t" + fmt.Sprint(requestID) + ":Test ilogtail-1 send Heartbeat. ")
{
agent := new(proto.Agent)
agent.AgentId = "ilogtail-1"
agent.Attributes = &proto.AgentAttributes{}
agent.RunningStatus = "good"
agent.StartupTime = 100

status, res := HeartBeat(r, agent, fmt.Sprint(requestID))
status, res := HeartBeat(r, agent, configInfos, fmt.Sprint(requestID))

// check
So(status, ShouldEqual, common.Accept.Status)
So(res.RequestId, ShouldEqual, fmt.Sprint(requestID))
So(res.Code, ShouldEqual, proto.RespCode_ACCEPT)
So(res.Message, ShouldEqual, "Send heartbeat success")
So(res.Message, ShouldEqual, "Send heartbeat successGet config update infos success")

requestID++
}
Expand All @@ -766,16 +769,17 @@ func TestAgentSendMessage(t *testing.T) {
{
agent := new(proto.Agent)
agent.AgentId = "ilogtail-2"
agent.Attributes = &proto.AgentAttributes{}
agent.RunningStatus = "good"
agent.StartupTime = 200

status, res := HeartBeat(r, agent, fmt.Sprint(requestID))
status, res := HeartBeat(r, agent, configInfos, fmt.Sprint(requestID))

// check
So(status, ShouldEqual, common.Accept.Status)
So(res.RequestId, ShouldEqual, fmt.Sprint(requestID))
So(res.Code, ShouldEqual, proto.RespCode_ACCEPT)
So(res.Message, ShouldEqual, "Send heartbeat success")
So(res.Message, ShouldEqual, "Send heartbeat successGet config update infos success")

requestID++
}
Expand All @@ -791,16 +795,17 @@ func TestAgentSendMessage(t *testing.T) {
{
agent := new(proto.Agent)
agent.AgentId = "ilogtail-1"
agent.Attributes = &proto.AgentAttributes{}
agent.RunningStatus = "good"
agent.StartupTime = 100

status, res := HeartBeat(r, agent, fmt.Sprint(requestID))
status, res := HeartBeat(r, agent, configInfos, fmt.Sprint(requestID))

// check
So(status, ShouldEqual, common.Accept.Status)
So(res.RequestId, ShouldEqual, fmt.Sprint(requestID))
So(res.Code, ShouldEqual, proto.RespCode_ACCEPT)
So(res.Message, ShouldEqual, "Send heartbeat success")
So(res.Message, ShouldEqual, "Send heartbeat successGet config update infos success")

requestID++
}
Expand Down Expand Up @@ -840,6 +845,7 @@ func TestAgentGetConfig(t *testing.T) {
Convey("Test Agent get config.", t, func() {
agent := new(proto.Agent)
agent.AgentId = "ilogtail-1"
agent.Attributes = &proto.AgentAttributes{}
agent.RunningStatus = "good"
agent.StartupTime = 100

Expand Down Expand Up @@ -970,15 +976,23 @@ func TestAgentGetConfig(t *testing.T) {
requestID++
}

fmt.Print("\n\t" + fmt.Sprint(requestID) + ":Sleep 3s, wait for writing config info to store. ")
{
time.Sleep(time.Second * 3)

requestID++
}

fmt.Print("\n\t" + fmt.Sprint(requestID) + ":Test ilogtail-1 send Heartbeat. ")
{
status, res := HeartBeat(r, agent, fmt.Sprint(requestID))
status, res := HeartBeat(r, agent, configInfos, fmt.Sprint(requestID))

// check
So(status, ShouldEqual, common.Accept.Status)
So(res.RequestId, ShouldEqual, fmt.Sprint(requestID))
So(res.Code, ShouldEqual, proto.RespCode_ACCEPT)
So(res.Message, ShouldEqual, "Send heartbeat success")
So(res.Message, ShouldEqual, "Send heartbeat successGet config update infos success")
So(len(res.PipelineCheckResults), ShouldEqual, 3)
for _, info := range res.PipelineCheckResults {
configVersions[info.Name] = info.NewVersion
switch info.Name {
Expand All @@ -1003,7 +1017,7 @@ func TestAgentGetConfig(t *testing.T) {
So(status, ShouldEqual, common.Accept.Status)
So(res.RequestId, ShouldEqual, fmt.Sprint(requestID))
So(res.Code, ShouldEqual, proto.RespCode_ACCEPT)
So(res.Message, ShouldEqual, "Get config update infos success")
So(res.Message, ShouldEqual, "Get Agent Config details success")
So(len(res.ConfigDetails), ShouldEqual, 3)
requestID++
}
Expand Down Expand Up @@ -1059,15 +1073,23 @@ func TestAgentGetConfig(t *testing.T) {
requestID++
}

fmt.Print("\n\t" + fmt.Sprint(requestID) + ":Sleep 3s, wait for writing config info to store. ")
{
time.Sleep(time.Second * 3)

requestID++
}

fmt.Print("\n\t" + fmt.Sprint(requestID) + ":Test ilogtail-1 send Heartbeat. ")
{
status, res := HeartBeat(r, agent, fmt.Sprint(requestID))
status, res := HeartBeat(r, agent, configInfos, fmt.Sprint(requestID))

// check
So(status, ShouldEqual, common.Accept.Status)
So(res.RequestId, ShouldEqual, fmt.Sprint(requestID))
So(res.Code, ShouldEqual, proto.RespCode_ACCEPT)
So(res.Message, ShouldEqual, "Send heartbeat success")
So(res.Message, ShouldEqual, "Send heartbeat successGet config update infos success")
So(len(res.PipelineCheckResults), ShouldEqual, 3)
configVersions = map[string]int64{}
for _, info := range res.PipelineCheckResults {
configVersions[info.Name] = info.NewVersion
Expand All @@ -1093,7 +1115,7 @@ func TestAgentGetConfig(t *testing.T) {
So(status, ShouldEqual, common.Accept.Status)
So(res.RequestId, ShouldEqual, fmt.Sprint(requestID))
So(res.Code, ShouldEqual, proto.RespCode_ACCEPT)
So(res.Message, ShouldEqual, "Get config update infos success")
So(res.Message, ShouldEqual, "Get Agent Config details success")
So(len(res.ConfigDetails), ShouldEqual, 3)
requestID++
}
Expand Down
22 changes: 16 additions & 6 deletions config_server/service/test/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func GetConfig(r *gin.Engine, configName string, requestID string) (int, *config

// request
w := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/User/GetConfig", bytes.NewBuffer(reqBodyByte))
req, _ := http.NewRequest("POST", "/User/GetConfig", bytes.NewBuffer(reqBodyByte))
r.ServeHTTP(w, req)

// response
Expand Down Expand Up @@ -139,7 +139,7 @@ func ListConfigs(r *gin.Engine, requestID string) (int, *configserverproto.ListC

// request
w := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/User/ListConfigs", bytes.NewBuffer(reqBodyByte))
req, _ := http.NewRequest("POST", "/User/ListConfigs", bytes.NewBuffer(reqBodyByte))
r.ServeHTTP(w, req)

// response
Expand Down Expand Up @@ -204,7 +204,7 @@ func GetAppliedConfigsForAgentGroup(r *gin.Engine, groupName string, requestID s

// request
w := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/User/GetAppliedConfigsForAgentGroup", bytes.NewBuffer(reqBodyByte))
req, _ := http.NewRequest("POST", "/User/GetAppliedConfigsForAgentGroup", bytes.NewBuffer(reqBodyByte))
r.ServeHTTP(w, req)

// response
Expand All @@ -225,7 +225,7 @@ func GetAppliedAgentGroups(r *gin.Engine, configName string, requestID string) (

// request
w := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/User/GetAppliedAgentGroups", bytes.NewBuffer(reqBodyByte))
req, _ := http.NewRequest("POST", "/User/GetAppliedAgentGroups", bytes.NewBuffer(reqBodyByte))
r.ServeHTTP(w, req)

// response
Expand All @@ -246,7 +246,7 @@ func ListAgents(r *gin.Engine, groupName string, requestID string) (int, *config

// request
w := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/User/ListAgents", bytes.NewBuffer(reqBodyByte))
req, _ := http.NewRequest("POST", "/User/ListAgents", bytes.NewBuffer(reqBodyByte))
r.ServeHTTP(w, req)

// response
Expand All @@ -258,15 +258,25 @@ func ListAgents(r *gin.Engine, groupName string, requestID string) (int, *config
return res.StatusCode, resBody
}

func HeartBeat(r *gin.Engine, agent *configserverproto.Agent, requestID string) (int, *configserverproto.HeartBeatResponse) {
func HeartBeat(r *gin.Engine, agent *configserverproto.Agent, configInfos []*configserverproto.ConfigCheckResult, requestID string) (int, *configserverproto.HeartBeatResponse) {
// data
reqBody := configserverproto.HeartBeatRequest{}
reqBody.RequestId = requestID
reqBody.AgentId = agent.AgentId
reqBody.AgentType = agent.AgentType
reqBody.Attributes = agent.Attributes
reqBody.RunningStatus = agent.RunningStatus
reqBody.StartupTime = agent.StartupTime
reqBody.Tags = agent.Tags
pipelineConfigs := make([]*configserverproto.ConfigInfo, 0)
for _, c := range configInfos {
conf := new(configserverproto.ConfigInfo)
conf.Type = c.Type
conf.Name = c.Name
conf.Version = c.OldVersion
pipelineConfigs = append(pipelineConfigs, conf)
}
reqBody.PipelineConfigs = pipelineConfigs
reqBodyByte, _ := proto.Marshal(&reqBody)

// request
Expand Down
10 changes: 6 additions & 4 deletions core/controller/EventDispatcherBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -779,8 +779,10 @@ bool EventDispatcherBase::Dispatch() {
SyncWindowsSignalObject();
#endif

if (LogtailGlobalPara::Instance()->GetSigtermFlag())
if (LogtailGlobalPara::Instance()->GetSigtermFlag()) {
LOG_INFO(sLogger, ("received SIGTERM signal", "exit process"));
ExitProcess();
}

#if defined(__linux__)
int nfd = epoll_wait(
Expand Down Expand Up @@ -1216,8 +1218,10 @@ void EventDispatcherBase::UpdateConfig() {
void EventDispatcherBase::ExitProcess() {
#if defined(__linux__)
if (mStreamLogManagerPtr != NULL) {
LOG_INFO(sLogger, ("StreamLogManager", "shutdown"));
((StreamLogManager*)mStreamLogManagerPtr)->Shutdown();
}
ObserverManager::GetInstance()->HoldOn(true);
#endif

LOG_INFO(sLogger, ("LogInput", "hold on"));
Expand All @@ -1243,13 +1247,11 @@ void EventDispatcherBase::ExitProcess() {
LOG_INFO(sLogger, ("flush log process buffer", "start"));

// resume log process thread to process last buffer
// previously hold on by LogInput
LogProcess::GetInstance()->Resume();
Sender::Instance()->SetQueueUrgent();
// exit logtail plugin
LogtailPlugin::GetInstance()->HoldOn(true);
#if defined(__linux__)
ObserverManager::GetInstance()->HoldOn(true);
#endif

bool logProcessFlushFlag = false;

Expand Down
2 changes: 2 additions & 0 deletions core/logtail.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,12 @@ DECLARE_FLAG_INT32(data_server_port);
DECLARE_FLAG_BOOL(enable_env_ref_in_config);

void HandleSighupSignal(int signum, siginfo_t* info, void* context) {
APSARA_LOG_INFO(sLogger, ("received signal", "SIGHUP"));
ConfigManager::GetInstance()->SetMappingPathsChanged();
}

void HandleSigtermSignal(int signum, siginfo_t* info, void* context) {
APSARA_LOG_INFO(sLogger, ("received signal", "SIGTERM"));
LogtailGlobalPara::Instance()->SetSigtermFlag(true);
}

Expand Down
Loading

0 comments on commit b303c29

Please sign in to comment.