Skip to content

Commit

Permalink
feature: implement log sequential search (#2373) (#2410)
Browse files Browse the repository at this point in the history
* feature: implement log sequential search

* optimize: timestampString rename to timestampNanos

* optimize: add unit tests.

* optimize: add unit tests.

* optimize: add unit tests.

Co-authored-by: 郭刚平 <512979011@qq.com>
  • Loading branch information
erda-bot and snakorse authored Oct 15, 2021
1 parent 701c281 commit 3f0e542
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 20 deletions.
5 changes: 5 additions & 0 deletions conf/monitor/monitor/logs/default_field_settings.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
fields:
- field_name: tags.level
support_aggregation: true
display: true
group: 0
allow_edit: true
- field_name: source
support_aggregation: false
display: false
Expand Down
17 changes: 9 additions & 8 deletions modules/core/monitor/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ package log

// Log .
type Log struct {
DocId string `json:"_id"`
Source string `json:"source"`
ID string `json:"id"`
Stream string `json:"stream"`
Content string `json:"content"`
Offset int64 `json:"offset"`
Timestamp int64 `json:"timestamp"`
Tags map[string]string `json:"tags"`
DocId string `json:"_id"`
Source string `json:"source"`
ID string `json:"id"`
Stream string `json:"stream"`
Content string `json:"content"`
Offset int64 `json:"offset"`
Timestamp int64 `json:"timestamp"`
TimestampNanos string `json:"timestampNanos"`
Tags map[string]string `json:"tags"`
}

// LogMeta .
Expand Down
18 changes: 12 additions & 6 deletions modules/extensions/loghub/index/query/log_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type LogRequest struct {
Query string
Debug bool
Lang i18n.LanguageCodes
TimeScale time.Duration
}

type LogDownloadRequest struct {
Expand All @@ -67,10 +68,11 @@ type LogDownloadRequest struct {
// LogSearchRequest .
type LogSearchRequest struct {
LogRequest
Page int64
Size int64
Sort []string
Highlight bool
Page int64
Size int64
Sort []string
Highlight bool
SearchAfter []interface{}
}

// LogStatisticRequest .
Expand Down Expand Up @@ -209,6 +211,10 @@ func (c *ESClient) getSearchSource(req *LogSearchRequest, boolQuery *elastic.Boo
Field("*"))
}

if len(req.SearchAfter) > 0 {
searchSource.SearchAfter(req.SearchAfter...)
}

// max allowed size limit to 10000
size := req.Size
if req.Page*req.Size > 10000 {
Expand Down Expand Up @@ -250,8 +256,8 @@ func (c *ESClient) getSort(searchSource *elastic.SearchSource, sorts []string) *
func (c *ESClient) filterIndices(req *LogRequest) []string {
var indices []string
if len(req.Addon) > 0 {
start := req.Start * int64(time.Millisecond)
end := req.End * int64(time.Millisecond)
start := req.Start * int64(req.TimeScale)
end := req.End * int64(req.TimeScale)
for _, entry := range c.Entrys {
if (entry.MinTS == 0 || entry.MinTS <= end) &&
(entry.MaxTS == 0 || entry.MaxTS >= start) {
Expand Down
70 changes: 70 additions & 0 deletions modules/extensions/loghub/index/query/log_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package query
import (
"encoding/json"
"fmt"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -258,6 +259,75 @@ func Test_getSearchSource_Should_Sort_As_Expect(t *testing.T) {
}
}

func Test_filterIndices_With_Addon_Should_Filter_Success(t *testing.T) {
c := &ESClient{
Entrys: []*IndexEntry{
{
Index: "index-1",
MinTS: 1 * int64(time.Millisecond),
MaxTS: 20 * int64(time.Millisecond),
},
{
Index: "index-2",
MinTS: 20 * int64(time.Millisecond),
},
},
}
req := &LogRequest{
Start: 1,
End: 10,
TimeScale: time.Millisecond,
Addon: "addon-1",
ClusterName: "cluster-1",
}
want := []string{"index-1"}

indices := c.filterIndices(req)
if len(indices) != len(want) {
t.Errorf("filterd indices failed, expect len: %d, but got len: %d", len(want), len(indices))
}
for i, index := range want {
if indices[i] != index {
t.Errorf("filterd indices assert failed, expect: %s", index)
}
}
}

func Test_getSearchSource_Should_Include_SearchAfter(t *testing.T) {
c := &ESClient{}
req := &LogSearchRequest{
SearchAfter: []interface{}{"12343434", 123, 123},
}
result, err := c.getSearchSource(req, elastic.NewBoolQuery()).Source()
if err != nil {
t.Errorf("should not error getting serialized search source")
}
data := result.(map[string]interface{})["search_after"].([]interface{})
if len(data) != len(req.SearchAfter) {
t.Errorf("search_after generated not as expect, expect len: %d, but got len: %d", len(req.SearchAfter), len(data))
}
for i, item := range data {
if item != req.SearchAfter[i] {
t.Errorf("search_after generated not as expect")
}
}
}

func Test_getBoolQueryV2_Should_Work_As_Expect(t *testing.T) {
c := &ESClient{}
req := LogRequest{
Start: 1,
End: 10,
TimeScale: time.Millisecond,
}
want := "map[range:map[timestamp:map[from:1000000 include_lower:true include_upper:true to:10000000]]]"
result, _ := c.getBoolQueryV2(&req).Source()
got := fmt.Sprintf("%+v", result)
if !strings.Contains(got, want) {
t.Errorf("expect: %s, but got: %s", want, got)
}
}

func Test_aggregateFields_With_ValidParams_Should_Success(t *testing.T) {
c := &ESClient{}
want := struct {
Expand Down
21 changes: 15 additions & 6 deletions modules/extensions/loghub/index/query/log_query_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package query
import (
"encoding/json"
"fmt"
"strconv"
"time"

"github.com/olivere/elastic"
Expand All @@ -26,9 +27,16 @@ import (

func (c *ESClient) getBoolQueryV2(req *LogRequest) *elastic.BoolQuery {
boolQuery := c.getTagsBoolQuery(req)
start := req.Start * int64(time.Millisecond)
end := req.End * int64(time.Millisecond)
boolQuery = boolQuery.Filter(elastic.NewRangeQuery("timestamp").Gte(start).Lte(end))
start := req.Start * int64(req.TimeScale)
end := req.End * int64(req.TimeScale)
timeRangeQuery := elastic.NewRangeQuery("timestamp")
if start > 0 {
timeRangeQuery.Gte(start)
}
if end > 0 {
timeRangeQuery.Lte(end)
}
boolQuery = boolQuery.Filter(timeRangeQuery)
if len(req.Query) > 0 {
//byts, _ := json.Marshal(req.Query)
boolQuery = boolQuery.Filter(elastic.NewQueryStringQuery(req.Query).DefaultField("content").DefaultOperator("AND"))
Expand Down Expand Up @@ -63,6 +71,7 @@ func (c *ESClient) searchLogsV2(req *LogSearchRequest, timeout time.Duration) (*
}
c.setModule(&log)
log.DocId = hit.Id
log.TimestampNanos = strconv.FormatInt(log.Timestamp, 10)
log.Timestamp = log.Timestamp / int64(time.Millisecond)
item := &LogItem{Source: &log, Highlight: map[string][]string(hit.Highlight)}
if item.Highlight != nil {
Expand Down Expand Up @@ -90,9 +99,9 @@ func (c *ESClient) statisticLogsV2(req *LogStatisticRequest, timeout time.Durati
}

intervalMillisecond := interval
start := req.Start * int64(time.Millisecond)
end := req.End * int64(time.Millisecond)
interval = interval * int64(time.Millisecond)
start := req.Start * int64(req.TimeScale)
end := req.End * int64(req.TimeScale)
interval = interval * int64(req.TimeScale)
boundEnd := end - (end-start)%interval
if boundMod := (end - start) % interval; boundMod == 0 {
boundEnd = boundEnd - interval
Expand Down
55 changes: 55 additions & 0 deletions modules/extensions/loghub/index/query/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func (p *provider) intRoutes(routes httpserver.Router) error {
// 项目 + env 日志查询
routes.GET("/api/micro_service/:addon/logs/statistic/histogram", p.logStatistic)
routes.GET("/api/micro_service/:addon/logs/search", p.logSearch)
routes.GET("/api/micro_service/:addon/logs/sequentialSearch", p.logSequentialSearch)
routes.GET("/api/micro_service/logs/tags/tree", p.logMSTagsTree)
routes.GET("/api/micro_service/:addon/logs/fields", p.logFields)
routes.GET("/api/micro_service/:addon/logs/fields/aggregation", p.logFieldsAggregation)
Expand Down Expand Up @@ -102,6 +103,7 @@ func (p *provider) logStatistic(r *http.Request, params struct {
Addon: params.Addon,
Start: params.Start,
End: params.End,
TimeScale: time.Millisecond,
Filters: filters,
Query: params.Query,
Debug: params.Debug,
Expand Down Expand Up @@ -150,6 +152,7 @@ func (p *provider) logFieldsAggregation(r *http.Request, params struct {
Addon: params.Addon,
Start: params.Start,
End: params.End,
TimeScale: time.Millisecond,
Filters: filters,
Query: params.Query,
Debug: params.Debug,
Expand Down Expand Up @@ -217,6 +220,7 @@ func (p *provider) logDownload(r *http.Request, w http.ResponseWriter, params st
Addon: params.Addon,
Start: params.Start,
End: params.End,
TimeScale: time.Millisecond,
Filters: filters,
Query: params.Query,
Debug: params.Debug,
Expand All @@ -242,6 +246,56 @@ func (p *provider) logDownload(r *http.Request, w http.ResponseWriter, params st
return nil
}

func (p *provider) logSequentialSearch(r *http.Request, params struct {
TimestampNanos int64 `query:"timestampNanos"`
Id string `query:"id"`
Offset int64 `query:"offset"`
Count int64 `query:"count"`
Query string `query:"query"`
Sort string `query:"sort"`
Debug bool `query:"debug"`
Addon string `param:"addon"`
ClusterName string `query:"clusterName"`
}) interface{} {
orgID := api.OrgID(r)
orgid, err := strconv.ParseInt(orgID, 10, 64)
if err != nil {
return api.Errors.InvalidParameter("invalid Org-ID")
}
if params.Count <= 0 {
params.Count = 20
}
filters := p.buildLogFilters(r)
start, end := params.TimestampNanos, int64(0)
if params.Sort == "desc" {
start, end = end, start
}
sorts := []string{"timestamp " + params.Sort, "id " + params.Sort, "offset " + params.Sort}

logs, err := p.SearchLogs(&LogSearchRequest{
LogRequest: LogRequest{
OrgID: orgid,
ClusterName: params.ClusterName,
Addon: params.Addon,
Start: start,
End: end,
TimeScale: time.Nanosecond,
Filters: filters,
Query: params.Query,
Debug: params.Debug,
Lang: api.Language(r),
},
Page: 1,
Size: params.Count,
Sort: sorts,
SearchAfter: []interface{}{params.TimestampNanos, params.Id, params.Offset},
})
if err != nil {
return api.Errors.Internal(err)
}
return api.Success(logs)
}

func (p *provider) logSearch(r *http.Request, params struct {
Start int64 `query:"start" validate:"gte=1"`
End int64 `query:"end" validate:"gte=1"`
Expand Down Expand Up @@ -274,6 +328,7 @@ func (p *provider) logSearch(r *http.Request, params struct {
Addon: params.Addon,
Start: params.Start,
End: params.End,
TimeScale: time.Millisecond,
Filters: filters,
Query: params.Query,
Debug: params.Debug,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) 2021 Terminus, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package monitor

import "github.com/erda-project/erda/modules/openapi/api/apis"

var MONITOR_ADDON_LOGS_SEARCH_SEQUENTIAL = apis.ApiSpec{
Path: "/api/log-analytics/<addon>/sequentialSearch",
BackendPath: "/api/micro_service/<addon>/logs/sequentialSearch",
Host: "monitor.marathon.l4lb.thisdcos.directory:7096",
Scheme: "http",
Method: "GET",
CheckLogin: true,
CheckToken: true,
Doc: "summary: 日志连续搜索接口",
}

0 comments on commit 3f0e542

Please sign in to comment.