Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
linrunqi08 committed Jan 9, 2024
1 parent 669fa09 commit 2f01470
Show file tree
Hide file tree
Showing 23 changed files with 114 additions and 104 deletions.
16 changes: 4 additions & 12 deletions pkg/helper/collector_imp.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,6 @@ type observePipeCollector struct {
procTimeMS pipeline.CounterMetric
}

func (p *observePipeCollector) Prepare(metricRecord *pipeline.MetricsRecord) {
p.metricRecord = metricRecord
p.procInRecordsTotal = NewCounterMetric("proc_in_records_total")
p.procOutRecordsTotal = NewCounterMetric("proc_out_records_total")
p.procTimeMS = NewCounterMetric("proc_time_ms")
}

func (p *observePipeCollector) Collect(group *models.GroupInfo, events ...models.PipelineEvent) {
if len(events) == 0 {
Expand All @@ -44,6 +38,7 @@ func (p *observePipeCollector) Collect(group *models.GroupInfo, events ...models
Group: group,
Events: events,
}
p.procOutRecordsTotal.Add(int64(len(events)))
}

func (p *observePipeCollector) CollectList(groups ...*models.PipelineGroupEvents) {
Expand All @@ -53,6 +48,7 @@ func (p *observePipeCollector) CollectList(groups ...*models.PipelineGroupEvents
for _, g := range groups {
p.procInRecordsTotal.Add(int64(len(g.Events)))
p.groupChan <- g
p.procOutRecordsTotal.Add(int64(len(g.Events)))
}
}

Expand Down Expand Up @@ -84,12 +80,6 @@ type groupedPipeCollector struct {
procTimeMS pipeline.CounterMetric
}

func (p *groupedPipeCollector) Prepare(metricRecord *pipeline.MetricsRecord) {
p.metricRecord = metricRecord
p.procInRecordsTotal = NewCounterMetric("proc_in_records_total")
p.procOutRecordsTotal = NewCounterMetric("proc_out_records_total")
p.procTimeMS = NewCounterMetric("proc_time_ms")
}

func (p *groupedPipeCollector) Collect(group *models.GroupInfo, events ...models.PipelineEvent) {
if len(events) == 0 {
Expand All @@ -101,6 +91,7 @@ func (p *groupedPipeCollector) Collect(group *models.GroupInfo, events ...models
store = make([]models.PipelineEvent, 0)
}
p.groupEvents[group] = append(store, events...)
p.procOutRecordsTotal.Add(int64(len(events)))
}

func (p *groupedPipeCollector) CollectList(groups ...*models.PipelineGroupEvents) {
Expand All @@ -110,6 +101,7 @@ func (p *groupedPipeCollector) CollectList(groups ...*models.PipelineGroupEvents
for _, g := range groups {
p.procInRecordsTotal.Add(int64(len(g.Events)))
p.Collect(g.Group, g.Events...)
p.procOutRecordsTotal.Add(int64(len(g.Events)))
}
}

Expand Down
1 change: 0 additions & 1 deletion pkg/pipeline/pipeline_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
// PipelineCollector collect data in the plugin and send the data to the next operator
type PipelineCollector interface { //nolint

Prepare(metricRecord *MetricsRecord)
// Collect single group and events data belonging to this group
Collect(groupInfo *models.GroupInfo, eventList ...models.PipelineEvent)

Expand Down
3 changes: 3 additions & 0 deletions pluginmanager/aggregator_wrapper_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (p *AggregatorWrapperV1) Add(loggroup *protocol.LogGroup) error {
p.procInRecordsTotal.Add(int64(len(loggroup.Logs)))
select {
case p.LogGroupsChan <- loggroup:
p.procOutRecordsTotal.Add(int64(len(loggroup.Logs)))
return nil
default:
return errAggAdd
Expand All @@ -99,6 +100,7 @@ func (p *AggregatorWrapperV1) AddWithWait(loggroup *protocol.LogGroup, duration
timer := time.NewTimer(duration)
select {
case p.LogGroupsChan <- loggroup:
p.procOutRecordsTotal.Add(int64(len(loggroup.Logs)))
return nil
case <-timer.C:
return errAggAdd
Expand All @@ -118,6 +120,7 @@ func (p *AggregatorWrapperV1) Run(control *pipeline.AsyncControl) {
}
p.procInRecordsTotal.Add(int64(len(logGroup.Logs)))
p.LogGroupsChan <- logGroup
p.procOutRecordsTotal.Add(int64(len(logGroup.Logs)))
}
if exitFlag {
return
Expand Down
11 changes: 6 additions & 5 deletions pluginmanager/aggregator_wrapper_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,13 @@ func (p *AggregatorWrapperV2) Init(name string, pluginID string) error {
return nil
}

// AddWithWait inserts @loggroup to LogGroupsChan, and it waits at most @duration.
// It works like Add but adds a timeout policy when log group queue is full.
// It returns errAggAdd when queue is full and timeout.
// NOTE: no body calls it now.
func (p *AggregatorWrapperV2) Record(events *models.PipelineGroupEvents, context pipeline.PipelineContext) error {
return p.Aggregator.Record(events, context)
p.procInRecordsTotal.Add(int64(len(events.Events)))
err := p.Aggregator.Record(events, context)
if err == nil {
p.procOutRecordsTotal.Add(int64(len(events.Events)))
}
return err
}

func (p *AggregatorWrapperV2) GetResult(context pipeline.PipelineContext) error {
Expand Down
2 changes: 1 addition & 1 deletion pluginmanager/context_imp.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (p *ContextImp) GetExtension(name string, cfg any) (pipeline.Extension, err
}

// create if not found
typeWithID := genEmbeddedPluginName(getPluginType(name))
typeWithID := p.logstoreC.genEmbeddedPluginName(getPluginType(name))
rawType, pluginID := getPluginTypeAndID(typeWithID)
err := loadExtension(typeWithID, rawType, pluginID, p.logstoreC, cfg)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion pluginmanager/flusher_wrapper_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ func (wrapper *FlusherWrapperV1) Flush(projectName string, logstoreName string,
wrapper.procInRecordsTotal.Add(int64(total))
startTime := time.Now()
err := wrapper.Flusher.Flush(projectName, logstoreName, configName, logGroupList)
wrapper.procTimeMS.Add(int64(time.Since(startTime)))
if err == nil {
wrapper.procOutRecordsTotal.Add(int64(total))
}
wrapper.procTimeMS.Add(int64(time.Since(startTime).Microseconds()))

Check failure on line 69 in pluginmanager/flusher_wrapper_v1.go

View workflow job for this annotation

GitHub Actions / CI (1.19, ubuntu)

unnecessary conversion (unconvert)
return err
}
31 changes: 29 additions & 2 deletions pluginmanager/flusher_wrapper_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
package pluginmanager

import (
"time"

"github.com/alibaba/ilogtail/pkg/helper"
"github.com/alibaba/ilogtail/pkg/models"
"github.com/alibaba/ilogtail/pkg/pipeline"
)
Expand All @@ -24,10 +27,34 @@ type FlusherWrapperV2 struct {
Flusher pipeline.FlusherV2
}

func (wrapper *FlusherWrapperV2) Init(name string, pluginID string) error {
func (wrapper *FlusherWrapperV2) Init(name string, pluginID string, context pipeline.PipelineContext) error {
labels := pipeline.GetCommonLabels(wrapper.Config.Context, name, pluginID)

wrapper.MetricRecord = wrapper.Config.Context.RegisterMetricRecord(labels)

wrapper.procInRecordsTotal = helper.NewCounterMetric("proc_in_records_total")
wrapper.procOutRecordsTotal = helper.NewCounterMetric("proc_out_records_total")
wrapper.procTimeMS = helper.NewCounterMetric("proc_time_ms")

wrapper.MetricRecord.RegisterCounterMetric(wrapper.procInRecordsTotal)
wrapper.MetricRecord.RegisterCounterMetric(wrapper.procOutRecordsTotal)
wrapper.MetricRecord.RegisterCounterMetric(wrapper.procTimeMS)

wrapper.Config.Context.SetMetricRecord(wrapper.MetricRecord)
return wrapper.Flusher.Init(wrapper.Config.Context)
}

func (wrapper *FlusherWrapperV2) Export(pipelineGroupEvents []*models.PipelineGroupEvents, pipelineContext pipeline.PipelineContext) error {
return wrapper.Flusher.Export(pipelineGroupEvents, pipelineContext)
total := 0
for _, groups := range pipelineGroupEvents {
total += len(groups.Events)
}
wrapper.procInRecordsTotal.Add(int64(total))
startTime := time.Now()
err := wrapper.Flusher.Export(pipelineGroupEvents, pipelineContext)
if err == nil {
wrapper.procOutRecordsTotal.Add(int64(total))
}
wrapper.procTimeMS.Add(int64(time.Since(startTime).Microseconds()))
return err
}
Binary file added pluginmanager/libPluginAdapter.so
Binary file not shown.
37 changes: 19 additions & 18 deletions pluginmanager/logstore_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ type LogstoreConfig struct {
ContainerLabelSet map[string]struct{}
EnvSet map[string]struct{}
CollectContainersFlag bool
pluginID int64
}

func (p *LogstoreStatistics) Init(context pipeline.Context) {
Expand Down Expand Up @@ -517,7 +518,7 @@ func createLogstoreConfig(project string, logstore string, configName string, lo
}
logger.Debug(contextImp.GetRuntimeContext(), "add extension", typeName)

pluginTypeWithID, rawPluginType, pluginID := genPluginTypeAndID(typeName)
pluginTypeWithID, rawPluginType, pluginID := logstoreC.genPluginTypeAndID(typeName)
err = loadExtension(pluginTypeWithID, rawPluginType, pluginID, logstoreC, extension["detail"])
if err != nil {
return nil, err
Expand All @@ -538,11 +539,11 @@ func createLogstoreConfig(project string, logstore string, configName string, lo
if _, isMetricInput := pipeline.MetricInputs[typeNameStr]; isMetricInput {
// Load MetricInput plugin defined in pipeline.MetricInputs
// pipeline.MetricInputs will be renamed in a future version
_, rawPluginType, pluginID := genPluginTypeAndID(typeNameStr)
_, rawPluginType, pluginID := logstoreC.genPluginTypeAndID(typeNameStr)
err = loadMetric(rawPluginType, pluginID, logstoreC, input["detail"])
} else if _, isServiceInput := pipeline.ServiceInputs[typeNameStr]; isServiceInput {
// Load ServiceInput plugin defined in pipeline.ServiceInputs
_, rawPluginType, pluginID := genPluginTypeAndID(typeNameStr)
_, rawPluginType, pluginID := logstoreC.genPluginTypeAndID(typeNameStr)
err = loadService(rawPluginType, pluginID, logstoreC, input["detail"])
}
if err != nil {
Expand Down Expand Up @@ -571,7 +572,7 @@ func createLogstoreConfig(project string, logstore string, configName string, lo
if typeName, ok := processor["type"]; ok {
if typeNameStr, ok := typeName.(string); ok {
logger.Debug(contextImp.GetRuntimeContext(), "add processor", typeNameStr)
_, rawPluginType, pluginID := genPluginTypeAndID(typeNameStr)
_, rawPluginType, pluginID := logstoreC.genPluginTypeAndID(typeNameStr)
err = loadProcessor(rawPluginType, pluginID, i, logstoreC, processor["detail"])
if err != nil {
return nil, err
Expand All @@ -598,7 +599,7 @@ func createLogstoreConfig(project string, logstore string, configName string, lo
if typeName, ok := aggregator["type"]; ok {
if typeNameStr, ok := typeName.(string); ok {
logger.Debug(contextImp.GetRuntimeContext(), "add aggregator", typeNameStr)
_, rawPluginType, pluginID := genPluginTypeAndID(typeNameStr)
_, rawPluginType, pluginID := logstoreC.genPluginTypeAndID(typeNameStr)
err = loadAggregator(rawPluginType, pluginID, logstoreC, aggregator["detail"])
if err != nil {
return nil, err
Expand All @@ -614,7 +615,7 @@ func createLogstoreConfig(project string, logstore string, configName string, lo
return nil, fmt.Errorf("invalid aggregator type : %s, not json array", "aggregators")
}
}
if err = logstoreC.PluginRunner.AddDefaultAggregator(getPluginID()); err != nil {
if err = logstoreC.PluginRunner.AddDefaultAggregator(logstoreC.genEmbeddedPluginID()); err != nil {
return nil, err
}

Expand All @@ -628,7 +629,7 @@ func createLogstoreConfig(project string, logstore string, configName string, lo
if typeName, ok := flusher["type"]; ok {
if typeNameStr, ok := typeName.(string); ok {
logger.Debug(contextImp.GetRuntimeContext(), "add flusher", typeNameStr)
_, rawPluginType, pluginID := genPluginTypeAndID(typeNameStr)
_, rawPluginType, pluginID := logstoreC.genPluginTypeAndID(typeNameStr)
err = loadFlusher(rawPluginType, pluginID, logstoreC, flusher["detail"])
if err != nil {
return nil, err
Expand All @@ -644,7 +645,7 @@ func createLogstoreConfig(project string, logstore string, configName string, lo
return nil, fmt.Errorf("invalid flusher type : %s, not json array", "flushers")
}
}
if err = logstoreC.PluginRunner.AddDefaultFlusher(getPluginID()); err != nil {
if err = logstoreC.PluginRunner.AddDefaultFlusher(logstoreC.genEmbeddedPluginID()); err != nil {
return nil, err
}
return logstoreC, nil
Expand Down Expand Up @@ -802,15 +803,10 @@ func applyPluginConfig(plugin interface{}, pluginConfig interface{}) error {
return err
}

func getPluginID() string {
id := atomic.AddInt64(&embeddedNamingCnt, 1)
return fmt.Sprintf("%v", id)
}

// genPluginTypeWithID extracts plugin type with pluginID from full pluginName.
// Rule: pluginName=pluginType/pluginID#pluginPriority.
// It returns the plugin type with pluginID.
func genPluginTypeAndID(pluginName string) (string, string, string) {
func (lc *LogstoreConfig) genPluginTypeAndID(pluginName string) (string, string, string) {
if isPluginTypeWithID(pluginName) {
pluginTypeWithID := pluginName
if idx := strings.IndexByte(pluginName, '#'); idx != -1 {
Expand All @@ -820,7 +816,7 @@ func genPluginTypeAndID(pluginName string) (string, string, string) {
return pluginTypeWithID, pluginTypeWithID[:ids], pluginTypeWithID[ids+1:]
}
}
id := getPluginID()
id := lc.genEmbeddedPluginID()
pluginTypeWithID := fmt.Sprintf("%s/%s", pluginName, id)
return pluginTypeWithID, pluginName, id
}
Expand Down Expand Up @@ -857,9 +853,14 @@ func GetPluginPriority(pluginName string) int {
return 0
}

func genEmbeddedPluginName(pluginType string) string {
id := atomic.AddInt64(&embeddedNamingCnt, 1)
return fmt.Sprintf("%s/_gen_embedded_%v", pluginType, id)
func (lc *LogstoreConfig) genEmbeddedPluginID() string {
id := atomic.AddInt64(&lc.pluginID, 1)
return fmt.Sprintf("_gen_embedded_%v", id)
}

func (lc *LogstoreConfig) genEmbeddedPluginName(pluginType string) string {
id := lc.genEmbeddedPluginID()
return fmt.Sprintf("%s/%s", pluginType, id)
}

func init() {
Expand Down
3 changes: 2 additions & 1 deletion pluginmanager/logstore_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ func TestLogstoreConfig_ProcessRawLogV2(t *testing.T) {
}

func Test_genEmbeddedPluginName(t *testing.T) {
result := genEmbeddedPluginName("testPlugin")
l := new(LogstoreConfig)
result := l.genEmbeddedPluginName("testPlugin")
assert.Regexp(t, `testPlugin/_gen_embedded_\d+`, result)
}
10 changes: 10 additions & 0 deletions pluginmanager/metric_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,20 @@

package pluginmanager

import (
"encoding/json"
"fmt"
)

func GetMetrics() []map[string]string {
metrics := make([]map[string]string, 0)
for _, config := range LogtailConfig {
metrics = append(metrics, config.Context.GetMetricRecords()...)
}
jsonData, err := json.Marshal(metrics)

if err == nil {
fmt.Println(string(jsonData))
}
return metrics
}
1 change: 0 additions & 1 deletion pluginmanager/metric_wrapper_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,5 @@ func (p *MetricWrapperV2) Init(name string, pluginID string, inputInterval int)
}

func (p *MetricWrapperV2) Read(pipelineContext pipeline.PipelineContext) error {
pipelineContext.Collector().Prepare(p.MetricRecord)
return p.Input.Read(pipelineContext)
}
2 changes: 1 addition & 1 deletion pluginmanager/plugin_runner_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (p *pluginv2Runner) addFlusher(name string, pluginID string, flusher pipeli
wrapper.Flusher = flusher
wrapper.Interval = time.Millisecond * time.Duration(p.LogstoreConfig.GlobalConfig.FlushIntervalMs)
p.FlusherPlugins = append(p.FlusherPlugins, &wrapper)
return wrapper.Init(name, pluginID)
return wrapper.Init(name, pluginID, p.FlushPipeContext)
}

func (p *pluginv2Runner) addExtension(name string, extension pipeline.Extension) error {
Expand Down
2 changes: 1 addition & 1 deletion pluginmanager/processor_wrapper_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (wrapper *ProcessorWrapperV1) Process(logArray []*protocol.Log) []*protocol
wrapper.procInRecordsTotal.Add(int64(len(logArray)))
startTime := time.Now()
result := wrapper.Processor.ProcessLogs(logArray)
wrapper.procTimeMS.Add(int64(time.Since(startTime)))
wrapper.procTimeMS.Add(int64(time.Since(startTime).Microseconds()))

Check failure on line 59 in pluginmanager/processor_wrapper_v1.go

View workflow job for this annotation

GitHub Actions / CI (1.19, ubuntu)

unnecessary conversion (unconvert)
wrapper.procOutRecordsTotal.Add(int64(len(result)))
return result
}
19 changes: 19 additions & 0 deletions pluginmanager/processor_wrapper_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
package pluginmanager

import (
"time"

"github.com/alibaba/ilogtail/pkg/helper"
"github.com/alibaba/ilogtail/pkg/models"
"github.com/alibaba/ilogtail/pkg/pipeline"
)
Expand All @@ -25,9 +28,25 @@ type ProcessorWrapperV2 struct {
}

func (wrapper *ProcessorWrapperV2) Init(name string, pluginID string) error {
labels := pipeline.GetCommonLabels(wrapper.Config.Context, name, pluginID)
wrapper.MetricRecord = wrapper.Config.Context.RegisterMetricRecord(labels)

wrapper.procInRecordsTotal = helper.NewCounterMetric("proc_in_records_total")
wrapper.procOutRecordsTotal = helper.NewCounterMetric("proc_out_records_total")
wrapper.procTimeMS = helper.NewCounterMetric("proc_time_ms")

wrapper.MetricRecord.RegisterCounterMetric(wrapper.procInRecordsTotal)
wrapper.MetricRecord.RegisterCounterMetric(wrapper.procOutRecordsTotal)
wrapper.MetricRecord.RegisterCounterMetric(wrapper.procTimeMS)

wrapper.Config.Context.SetMetricRecord(wrapper.MetricRecord)
return wrapper.Processor.Init(wrapper.Config.Context)
}

func (wrapper *ProcessorWrapperV2) Process(in *models.PipelineGroupEvents, context pipeline.PipelineContext) {
wrapper.procInRecordsTotal.Add(int64(len(in.Events)))
startTime := time.Now()
wrapper.Processor.Process(in, context)
wrapper.procTimeMS.Add(int64(time.Since(startTime).Microseconds()))

Check failure on line 50 in pluginmanager/processor_wrapper_v2.go

View workflow job for this annotation

GitHub Actions / CI (1.19, ubuntu)

unnecessary conversion (unconvert)
wrapper.procOutRecordsTotal.Add(int64(len(in.Events)))
}
Loading

0 comments on commit 2f01470

Please sign in to comment.