Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat msp storage switch to es #2861

Merged
merged 10 commits into from
Nov 5, 2021
4 changes: 3 additions & 1 deletion cmd/monitor/streaming/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ import (
_ "github.com/erda-project/erda/modules/core/monitor/storekit/kafka/topic/initializer"
_ "github.com/erda-project/erda/modules/monitor/notify/storage/notify-record"
_ "github.com/erda-project/erda/modules/msp/apm/browser"
_ "github.com/erda-project/erda/modules/msp/apm/trace/storage"
_ "github.com/erda-project/erda/modules/msp/apm/trace/persist"
_ "github.com/erda-project/erda/modules/msp/apm/trace/storage/cassandra_v1"
_ "github.com/erda-project/erda/modules/msp/apm/trace/storage/elasticsearch"

// providers
_ "github.com/erda-project/erda-infra/providers"
Expand Down
12 changes: 10 additions & 2 deletions cmd/msp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,17 @@ import (
_ "github.com/erda-project/erda-infra/providers"
_ "github.com/erda-project/erda-infra/providers/cassandra"
_ "github.com/erda-project/erda-proto-go/core/monitor/alert/client"
_ "github.com/erda-project/erda-proto-go/core/monitor/event/client"
_ "github.com/erda-project/erda-proto-go/core/monitor/metric/client"
_ "github.com/erda-project/erda-proto-go/oap/entity/client"

_ "github.com/erda-project/erda-infra/providers/grpcclient"
_ "github.com/erda-project/erda-proto-go/core/services/authentication/credentials/accesskey/client"
_ "github.com/erda-project/erda/modules/core/monitor/settings"
_ "github.com/erda-project/erda/modules/core/monitor/settings/retention-strategy"
_ "github.com/erda-project/erda/modules/core/monitor/storekit/elasticsearch/index/cleaner"
_ "github.com/erda-project/erda/modules/core/monitor/storekit/elasticsearch/index/loader"
_ "github.com/erda-project/erda/modules/core/monitor/storekit/elasticsearch/index/retention-strategy"
_ "github.com/erda-project/erda/modules/msp/apm/adapter"
_ "github.com/erda-project/erda/modules/msp/apm/alert"
_ "github.com/erda-project/erda/modules/msp/apm/checker/apis"
Expand All @@ -39,10 +46,11 @@ import (
_ "github.com/erda-project/erda/modules/msp/apm/checker/task"
_ "github.com/erda-project/erda/modules/msp/apm/checker/task/fetcher/fixed"
_ "github.com/erda-project/erda/modules/msp/apm/checker/task/fetcher/scheduled"
_ "github.com/erda-project/erda/modules/msp/apm/exception"
_ "github.com/erda-project/erda/modules/msp/apm/exception/query"
_ "github.com/erda-project/erda/modules/msp/apm/metric"
_ "github.com/erda-project/erda/modules/msp/apm/notifygroup"
_ "github.com/erda-project/erda/modules/msp/apm/trace"
_ "github.com/erda-project/erda/modules/msp/apm/trace/query"
_ "github.com/erda-project/erda/modules/msp/apm/trace/storage/elasticsearch"
_ "github.com/erda-project/erda/modules/msp/configcenter"
_ "github.com/erda-project/erda/modules/msp/credential"
_ "github.com/erda-project/erda/modules/msp/instance/permission"
Expand Down
63 changes: 63 additions & 0 deletions conf/monitor/streaming/span_index_template.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
{
"index_patterns": [
"${ERDA_SPAN_INDEX_PREFIX:erda-spans-}*"
],
"settings": {
"number_of_shards": ${SPAN_INDEX_SHARDS:5},
"number_of_replicas": ${SPAN_INDEX_REPLICAS:1},
"index": {
"refresh_interval": "15s",
"translog.durability": "async",
"translog.sync_interval": "15s",
"translog.flush_threshold_size": "1024mb"
}
},
"mappings": {
"spans": {
"dynamic": "true",
"properties": {
"trace_id": {
"type": "text",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"span_id": {
"type": "text",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"parent_span_id": {
"type": "text",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"operation_name": {
"type": "text",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"start_time": {
"type": "date"
},
"end_time": {
"type": "date"
},
"tags": {
"type": "object"
}
}
}
}
}
78 changes: 77 additions & 1 deletion conf/monitor/streaming/streaming.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,82 @@ metric-persist:
generate_meta: true
machine_summary: true

# elasticsearch for span
elasticsearch@span:
_enable: ${SPAN_ELASTICSEARCH_ENABLE:false}
urls: "${SPAN_ELASTICSEARCH_URL:http://localhost:9200}"
security: ${SPAN_ELASTICSEARCH_SECURITY_ENABLE:false}
username: "${SPAN_ELASTICSEARCH_SECURITY_USERNAME}"
password: "${SPAN_ELASTICSEARCH_SECURITY_PASSWORD}"

elasticsearch.index.initializer@span:
_enable: ${WRITE_SPAN_TO_ES_ENABLE:false}
templates:
- name: "erda-spans"
path: "${CONFIG_PATH}/span_index_template.json"

elasticsearch.index.loader@span:
_enable: ${WRITE_SPAN_TO_ES_ENABLE:false}
load_mode: "LoadFromElasticSearchOnly"
index_reload_interval: "1m"
match:
- prefix: "erda-spans-"
patterns:
- "<org>-{number}"
- "<org>.<key>-{number}"

elasticsearch.index.creator@span:
_enable: ${WRITE_SPAN_TO_ES_ENABLE:false}
patterns:
- first_index: "erda-spans-<org>-000001"
alias: "erda-spans-<org>-rollover"
- first_index: "erda-spans-<org>.<key>-000001"
alias: "erda-spans-<org>.<key>-rollover"
remove_conflicting_indices: true

elasticsearch.index.rollover@span:
_enable: ${WRITE_SPAN_TO_ES_ENABLE:false}
check_interval: "30s"
body_file: "${CONFIG_PATH}/index_rollover.json"
patterns:
- index: "erda-spans-<org>-{number}"
alias: "erda-spans-<org>-rollover"
- index: "erda-spans-<org>.<key>-{number}"
alias: "erda-spans-<org>.<key>-rollover"

storage-retention-strategy@span:
_enable: ${WRITE_SPAN_TO_ES_ENABLE:false}
load_from_database: true
ttl_reload_interval: "3m"
default_ttl: "${LOG_TTL:168h}"

span-storage-elasticsearch:
_enable: ${WRITE_SPAN_TO_ES_ENABLE:false}
write_timeout: "1m"
index_type: "spans"

span-persist:
_enable: ${WRITE_SPAN_TO_ES_ENABLE:false}
spot_input:
topics: "${SPOT_TRACE_TOPICS:spot-trace}"
group: "${TRACE_GROUP_ID:spot-monitor-trace-dev}"
parallelism: ${SPOT_SPOTSPAN_CONSUMERS:3}
options:
auto.offset.reset: "${KAFKA_AUTO_OFFSET_RESET:latest}"
auto.commit.interval.ms: "${KAFKA_AUTO_COMMIT_INTERVAL_MS:1000}"
oap_input:
topics: "${OAP_TRACE_TOPICS:msp-jaeger-trace}"
group: "${TRACE_GROUP_ID:spot-monitor-trace-dev}"
parallelism: ${SPOT_OAPSPAN_CONSUMERS:3}
options:
auto.offset.reset: "${KAFKA_AUTO_OFFSET_RESET:latest}"
auto.commit.interval.ms: "${KAFKA_AUTO_COMMIT_INTERVAL_MS:1000}"
id_keys: "${SPAN_ID_KEYS:TERMINUS_DEFINE_TAG,terminus_define_tag,MESOS_TASK_ID,mesos_task_id}"
read_timeout: "5s"
buffer_size: ${SPAN_BATCH_SIZE:50}
parallelism: ${SPAN_PERSIST_PARALLELISM:3}
print_invalid_span: false

browser-analytics:
_enable: ${BROWSER_ENABLE:true}
input:
Expand All @@ -300,7 +376,7 @@ browser-analytics:
ipdb: "${CONFIG_PATH}/ipdata.dat"

trace-storage:
_enable: ${TRACE_ENABLE:true}
_enable: ${WRITE_SPAN_TO_CASSANDRA_ENABLE:true}
spot_input:
topics: "${SPOT_TRACE_TOPICS:spot-trace}"
group: "${TRACE_GROUP_ID:spot-monitor-trace-dev}"
Expand Down
5 changes: 5 additions & 0 deletions conf/msp/index_rollover_min.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"conditions": {
"max_size": "${INDEX_ROLLOVER_MIN_SIZE:256mb}"
}
}
65 changes: 63 additions & 2 deletions conf/msp/msp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,57 @@ cassandra:
password: ${CASSANDRA_SECURITY_PASSWORD}
timeout: "${CASSANDRA_TIMEOUT:3s}"

# span
elasticsearch@span:
_enable: ${SPAN_ELASTICSEARCH_ENABLE:false}
urls: "${SPAN_ES_URL:http://localhost:9200}"
security: ${SPAN_ES_SECURITY_ENABLE:false}
username: "${SPAN_ES_SECURITY_USERNAME}"
password: "${SPAN_ES_SECURITY_PASSWORD}"

elasticsearch.index.loader@span:
_enable: ${QUERY_SPAN_FROM_ES_ENABLE:true}
load_mode: "LoadFromElasticSearchOnly"
index_reload_interval: "1m"
query_index_time_range: true
cache_key_prefix: "es-index-span"
match:
- prefix: "erda-spans-"
patterns:
- "<org>-{number}"
- "<org>.<key>-{number}"
storage-retention-strategy@span:
_enable: ${QUERY_SPAN_FROM_ES_ENABLE:true}
default_ttl: "${SPAN_TTL:168h}"
load_from_database: false
ttl_reload_interval: "3m"
elasticsearch.index.retention-strategy@span:
_enable: ${QUERY_SPAN_FROM_ES_ENABLE:true}
key_patterns:
- "erda-spans-<org>.<key>-{number}"
elasticsearch.index.cleaner@span:
_enable: ${QUERY_SPAN_FROM_ES_ENABLE:true}
check_interval: "30m"
print_onluy: true
disk_clean:
enable: ${SPAN_DISK_CLEAN_ENABLE:true}
check_interval: "1m"
high_disk_usage_percent: ${HIGH_DISK_USAGE_PERCENT:80} # 触发磁盘清理的容量占比
low_disk_usage_percent: ${LOW_DISK_USAGE_PERCENT:70} # 触发磁盘清理时,尽量清理到的目标容量占比
min_indices_store: "${MIN_INDICES_STORE_PERCENT:10GB}" # 磁盘使用率高时,保证 索引最少可以占用的容量
min_indices_store_percent: ${MIN_INDICES_STORE_PERCENT:10} # 磁盘使用率高时,保证 索引最少可以占用总磁盘总量的百分比
rollover_body_file: "${CONFIG_PATH}/index_rollover_min.json"
rollover_alias_patterns:
- index: "erda-spans-<org>-{number}"
alias: "erda-spans-<org>-rollover"
- index: "erda-spans-<org>.<key>-{number}"
alias: "erda-spans-<org>.<key>-rollover"
span-storage-elasticsearch:
_enable: ${QUERY_SPAN_FROM_ES_ENABLE:true}
query_timeout: "1m"
read_page_size: 200


i18n:
common:
- conf/common/i18n/common.yml
Expand All @@ -70,6 +121,14 @@ grpc-client@erda.core.monitor.alert:
addr: "${MONITOR_GRPC_ADDR:monitor:7080}"
erda.core.monitor.alert-client:

grpc-client@erda.core.monitor.event:
addr: "${MONITOR_GRPC_ADDR:monitor:7080}"
# addr: "${MONITOR_GRPC_ADDR_LOCAL:localhost:7080}"
erda.core.monitor.event-client:

grpc-client@erda.oap.entity:
addr: "${MONITOR_GRPC_ADDR:monitor:7080}"
erda.oap.entity-client:

erda.msp.apm.alert:
micro_service_filter_tags: "${MICRO_SERVICE_FILTER_TAGS:_metric_name,_metric_scope,_metric_scope_id}"
Expand Down Expand Up @@ -126,13 +185,15 @@ erda.msp.apm.checker.task.plugins.http:
erda.msp.apm.checker.task:
default_periodic_worker_interval: "30s"

erda.msp.apm.trace:
erda.msp.apm.trace.query:
query_source: "${TRACE_QUERY_SOURCE:cassandra,elasticsearch}"
cassandra:
keyspace:
name: "spot_prod"
auto: false # auto generate keyspace

erda.msp.apm.exception:
erda.msp.apm.exception.query:
query_source: "${EXCEPTION_QUERY_SOURCE:cassandra,elasticsearch}"
cassandra:
keyspace:
name: "spot_prod"
Expand Down
65 changes: 65 additions & 0 deletions modules/msp/apm/exception/erda-error/persist/consume.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) 2021 Terminus, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package persist

import (
"encoding/json"
"time"

"github.com/erda-project/erda/modules/msp/apm/exception"
)

func (p *provider) decodeError(key, value []byte, topic *string, timestamp time.Time) (interface{}, error) {
data := &exception.Erda_error{}
if err := json.Unmarshal(value, data); err != nil {
p.stats.DecodeError(value, err)
if p.Cfg.PrintInvalidError {
p.Log.Warnf("unknown format error data: %s", string(value))
} else {
p.Log.Warnf("failed to decode error: %v", err)
}
return nil, err
}

if err := p.validator.Validate(data); err != nil {
p.stats.ValidateError(data)
if p.Cfg.PrintInvalidError {
p.Log.Warnf("invalid error data: %s", string(value))
} else {
p.Log.Warnf("invalid error: %v", err)
}
return nil, err
}
if err := p.metadata.Process(data); err != nil {
p.stats.MetadataError(data, err)
p.Log.Errorf("failed to process error metadata: %v", err)
}
return data, nil
}

func (p *provider) handleReadError(err error) error {
p.Log.Errorf("failed to read error from kafka: %s", err)
return nil // return nil to continue read
}

func (p *provider) handleWriteError(list []interface{}, err error) error {
p.Log.Errorf("failed to write error into storage: %s", err)
return nil // return nil to continue consume
}

func (p *provider) confirmErrorHandler(err error) error {
p.Log.Errorf("failed to confirm error from kafka: %s", err)
return err // return error to exit
}
34 changes: 34 additions & 0 deletions modules/msp/apm/exception/erda-error/persist/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) 2021 Terminus, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package persist

import (
"github.com/erda-project/erda/modules/msp/apm/exception"
)

type MetadataProcessor interface {
Process(data *exception.Erda_error) error
}

func newMetadataProcessor(cfg *config) MetadataProcessor {
return NopMetadataProcessor
}

type nopMetadataProcessor struct{}

func (*nopMetadataProcessor) Process(data *exception.Erda_error) error { return nil }

// NopMetadataProcessor .
var NopMetadataProcessor MetadataProcessor = &nopMetadataProcessor{}
Loading