Skip to content

Commit

Permalink
fix query logs and ttl bug (#2778)
Browse files Browse the repository at this point in the history
  • Loading branch information
recallsong authored Nov 2, 2021
1 parent a7e0350 commit d969c73
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 20 deletions.
3 changes: 0 additions & 3 deletions conf/monitor/monitor/monitor.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ elasticsearch.index.retention-strategy@log:
elasticsearch.index.cleaner@log:
_enable: ${QUERY_LOG_FROM_ES_ENABLE:false}
check_interval: "30m"
print_onluy: true
disk_clean:
enable: ${LOG_DISK_CLEAN_ENABLE:true}
check_interval: "1m"
Expand Down Expand Up @@ -148,7 +147,6 @@ elasticsearch.index.retention-strategy@event:
elasticsearch.index.cleaner@event:
_enable: ${QUERY_EVENT_FROM_ES_ENABLE:false}
check_interval: "30m"
print_onluy: true
disk_clean:
enable: ${EVENT_DISK_CLEAN_ENABLE:true}
check_interval: "1m"
Expand Down Expand Up @@ -197,7 +195,6 @@ elasticsearch.index.retention-strategy@metric:
- "spot-<metric>-<namespace>.<key>-<timestamp>"
elasticsearch.index.cleaner@metric:
check_interval: "30m"
print_onluy: true
disk_clean:
enable: ${METRIC_DISK_CLEAN_ENABLE:true}
check_interval: "1m"
Expand Down
12 changes: 5 additions & 7 deletions modules/core/monitor/log/query/log.query.service.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *logQueryService) queryLogItems(ctx context.Context, req Request, fn fun
return items, nil
}

func (s *logQueryService) walkLogItems(ctx context.Context, req Request, fn func(sel *storage.Selector) *storage.Selector, walk func(item *pb.LogItem) error) error {
func (s *logQueryService) walkLogItems(ctx context.Context, req Request, fn func(sel *storage.Selector) (*storage.Selector, error), walk func(item *pb.LogItem) error) error {
if req.GetCount() < 0 {
return errors.NewInvalidParameterError("count", "not allowed negative")
}
Expand All @@ -108,23 +108,21 @@ func (s *logQueryService) walkLogItems(ctx context.Context, req Request, fn func
return err
}
if fn != nil {
fn(sel)
sel, err = fn(sel)
if err != nil {
return err
}
}
it, err := s.getIterator(ctx, sel, req.GetLive())
if err != nil {
return errors.NewInternalServerError(err)
}
defer it.Close()
num, limit := 0, getLimit(req.GetCount())
for it.Next() {
if num >= limit {
break
}
log, ok := it.Value().(*pb.LogItem)
if !ok {
continue
}
num++
err := walk(log)
if err != nil {
return err
Expand Down
11 changes: 9 additions & 2 deletions modules/core/monitor/log/query/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"net/http"
"regexp"
"time"

"github.com/recallsong/go-utils/reflectx"

Expand All @@ -27,6 +28,7 @@ import (
"github.com/erda-project/erda/modules/core/monitor/log/storage"
"github.com/erda-project/erda/modules/monitor/common"
"github.com/erda-project/erda/modules/monitor/common/permission"
"github.com/erda-project/erda/pkg/common/errors"
api "github.com/erda-project/erda/pkg/common/httpapi"
)

Expand Down Expand Up @@ -80,6 +82,8 @@ func (r *LogRequest) GetDebug() bool { return r.Debug }

var lineBreak = []byte("\n")

const maxDownloadTimeRange = 1 * int64(time.Hour)

func (p *provider) downloadLog(w http.ResponseWriter, r *http.Request, req *LogRequest) interface{} {
filename := getFilename(req)
w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate")
Expand All @@ -92,7 +96,10 @@ func (p *provider) downloadLog(w http.ResponseWriter, r *http.Request, req *LogR

var count int
err := p.logQueryService.walkLogItems(r.Context(), req,
func(sel *storage.Selector) *storage.Selector {
func(sel *storage.Selector) (*storage.Selector, error) {
if sel.End-sel.Start > maxDownloadTimeRange {
return sel, errors.NewInvalidParameterError("(start,end]", "time range is too large for download")
}
if len(req.ClusterName) > 0 {
sel.Filters = append(sel.Filters, &storage.Filter{
Key: "tags.dice_cluster_name",
Expand All @@ -107,7 +114,7 @@ func (p *provider) downloadLog(w http.ResponseWriter, r *http.Request, req *LogR
Value: req.ApplicationID,
})
}
return sel
return sel, nil
},
func(item *pb.LogItem) error {
w.Write(reflectx.StringToBytes(item.Content))
Expand Down
46 changes: 41 additions & 5 deletions modules/core/monitor/log/storage/elasticsearch/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"errors"
"fmt"
"regexp"
"strings"

"github.com/olivere/elastic"
Expand All @@ -33,12 +34,38 @@ import (
)

const useScrollQuery = false
const useInMemContentFilter = true

func (p *provider) Iterator(ctx context.Context, sel *storage.Selector) (storekit.Iterator, error) {
// TODO check org
indices := p.Loader.Indices(ctx, sel.Start, sel.End, loader.KeyPath{
Recursive: true,
})
var matcher func(data *pb.LogItem) bool
if useInMemContentFilter {
for _, filter := range sel.Filters {
val, _ := filter.Value.(string)
if filter.Key != "content" || len(val) <= 0 {
continue
}
switch filter.Op {
case storage.EQ:
matcher = func(data *pb.LogItem) bool {
return data.Content == val
}
case storage.REGEXP:
regex, err := regexp.Compile(val)
if err != nil {
p.Log.Debugf("invalid regexp %q", val)
return storekit.EmptyIterator{}, nil
}
matcher = func(data *pb.LogItem) bool {
return regex.MatchString(data.Content)
}
}
}
}

if useScrollQuery {
searchSource := getSearchSource(sel.Start, sel.End, sel)
if sel.Debug {
Expand All @@ -58,7 +85,7 @@ func (p *provider) Iterator(ctx context.Context, sel *storage.Selector) (storeki
},
},
func() (*elastic.SearchSource, error) { return searchSource, nil },
decodeFunc(sel.Start, sel.End),
decodeFunc(sel.Start, sel.End, matcher),
)
}
if sel.Debug {
Expand All @@ -81,7 +108,7 @@ func (p *provider) Iterator(ctx context.Context, sel *storage.Selector) (storeki
func() (*elastic.SearchSource, error) {
return getSearchSource(sel.Start, sel.End, sel), nil
},
decodeFunc(sel.Start, sel.End),
decodeFunc(sel.Start, sel.End, matcher),
)
}

Expand All @@ -93,6 +120,11 @@ func getSearchSource(start, end int64, sel *storage.Selector) *elastic.SearchSou
if !ok {
continue
}
if useInMemContentFilter {
if filter.Key == "content" {
continue
}
}
switch filter.Op {
case storage.EQ:
query = query.Filter(elastic.NewTermQuery(filter.Key, val))
Expand All @@ -105,7 +137,7 @@ func getSearchSource(start, end int64, sel *storage.Selector) *elastic.SearchSou

var skip = errors.New("skip")

func decodeFunc(start, end int64) func(body []byte) (interface{}, error) {
func decodeFunc(start, end int64, matcher func(data *pb.LogItem) bool) func(body []byte) (interface{}, error) {
return func(body []byte) (interface{}, error) {
var data log.Log
err := json.Unmarshal(body, &data)
Expand All @@ -115,7 +147,7 @@ func decodeFunc(start, end int64) func(body []byte) (interface{}, error) {
if data.Timestamp < start || data.Timestamp >= end {
return nil, skip
}
return &pb.LogItem{
item := &pb.LogItem{
Source: data.Source,
Id: data.ID,
Stream: data.Stream,
Expand All @@ -124,6 +156,10 @@ func decodeFunc(start, end int64) func(body []byte) (interface{}, error) {
Content: data.Content,
Level: data.Tags["level"],
RequestId: data.Tags["request_id"],
}, nil
}
if matcher != nil && !matcher(item) {
return nil, skip
}
return item, nil
}
}
3 changes: 0 additions & 3 deletions modules/core/monitor/settings/settings.service.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,6 @@ func getValue(typ string, value interface{}) *structpb.Value {
}

func (s *settingsService) getOrgName(id int64) (string, error) {
if true {
return "terminus", nil
}
resp, err := s.bundle.GetOrg(int(id))
if err != nil {
return "", fmt.Errorf("fail to get orgName: %s", err)
Expand Down

0 comments on commit d969c73

Please sign in to comment.