diff --git a/conf/monitor/monitor/monitor.yaml b/conf/monitor/monitor/monitor.yaml index 1adfa66cff4..60fda01cef7 100644 --- a/conf/monitor/monitor/monitor.yaml +++ b/conf/monitor/monitor/monitor.yaml @@ -114,6 +114,8 @@ log-storage-cassandra: keyspace: name: "spot_prod" erda.core.monitor.log.query: + download_api_throttling: + current_limit: ${LOG_DOWNLOAD_API_THROTTLING_CURRENT_LIMIT:200} # event elasticsearch@event: diff --git a/modules/core/monitor/log/query/log.query.service.go b/modules/core/monitor/log/query/log.query.service.go index 17cd16a4fdb..2dcf9034be3 100644 --- a/modules/core/monitor/log/query/log.query.service.go +++ b/modules/core/monitor/log/query/log.query.service.go @@ -18,6 +18,7 @@ import ( context "context" "fmt" "sort" + "sync/atomic" "time" linq "github.com/ahmetb/go-linq/v3" @@ -32,11 +33,12 @@ import ( ) type logQueryService struct { - p *provider - startTime int64 - storageReader storage.Storage - k8sReader storage.Storage - frozenStorageReader storage.Storage + p *provider + startTime int64 + storageReader storage.Storage + k8sReader storage.Storage + frozenStorageReader storage.Storage + currentDownloadLimit *int64 } func (s *logQueryService) GetLog(ctx context.Context, req *pb.GetLogRequest) (*pb.GetLogResponse, error) { @@ -254,9 +256,17 @@ func (s *logQueryService) queryLogItems(ctx context.Context, req Request, fn fun } func (s *logQueryService) walkLogItems(ctx context.Context, req Request, fn func(sel *storage.Selector) (*storage.Selector, error), walk func(item *pb.LogItem) error) error { - //if req.GetCount() < 0 { + // if req.GetCount() < 0 { // return errors.NewInvalidParameterError("count", "not allowed negative") - //} + // } + if s.currentDownloadLimit != nil { + if atomic.LoadInt64(s.currentDownloadLimit) < 1 { + return fmt.Errorf("current download reached, please wait for a while") + } + atomic.AddInt64(s.currentDownloadLimit, -1) + defer atomic.AddInt64(s.currentDownloadLimit, 1) + } + sel, err := toQuerySelector(req) if err != nil { return err diff --git a/modules/core/monitor/log/query/provider.go b/modules/core/monitor/log/query/provider.go index 0064e325028..32acfc3694e 100644 --- a/modules/core/monitor/log/query/provider.go +++ b/modules/core/monitor/log/query/provider.go @@ -29,7 +29,11 @@ import ( perm "github.com/erda-project/erda/pkg/common/permission" ) -type config struct{} +type config struct { + DownloadAPIThrottling struct { + CurrentLimit int64 `file:"current_limit"` + } `file:"download_api_throttling"` +} type provider struct { Cfg *config @@ -52,6 +56,9 @@ func (p *provider) Init(ctx servicehub.Context) error { k8sReader: p.K8sReader, frozenStorageReader: p.FrozenStorageReader, } + if p.Cfg.DownloadAPIThrottling.CurrentLimit > 0 { + p.logQueryService.currentDownloadLimit = &p.Cfg.DownloadAPIThrottling.CurrentLimit + } if p.Register != nil { pb.RegisterLogQueryServiceImp(p.Register, p.logQueryService, apis.Options(), p.Perm.Check( perm.NoPermMethod(pb.LogQueryServiceServer.GetLog), diff --git a/modules/core/monitor/storekit/elasticsearch/iterator.go b/modules/core/monitor/storekit/elasticsearch/iterator.go index 12a8840c867..206d45d480e 100644 --- a/modules/core/monitor/storekit/elasticsearch/iterator.go +++ b/modules/core/monitor/storekit/elasticsearch/iterator.go @@ -128,6 +128,7 @@ type searchIterator struct { sorts []*SortItem searchAfter []interface{} decode func(data *elastic.SearchHit) (interface{}, error) + limiter storekit.RateLimiter lastID string lastSortValues []interface{} @@ -196,7 +197,7 @@ func (it *searchIterator) Error() error { return it.err } -func (it *searchIterator) fetch(dir iteratorDir) error { +func (it *searchIterator) fetch(dir iteratorDir) { it.dir = dir var reverse bool if it.dir == iteratorBackward { @@ -204,45 +205,46 @@ func (it *searchIterator) fetch(dir iteratorDir) error { } it.buffer = nil for it.err == nil && len(it.buffer) <= 0 { - func() error { - searchSource, err := it.search() - if err != nil { - it.err = err - return it.err - } - ss := it.client.Search(it.indices...).IgnoreUnavailable(true).AllowNoIndices(true).Timeout(it.timeoutMS). - SearchSource(searchSource).From(it.fromOffset).Size(it.pageSize).SearchAfter(it.lastSortValues...) - if reverse { - for _, item := range it.sorts { - ss = ss.Sort(item.Key, !item.Ascending) - } - } else { - for _, item := range it.sorts { - ss = ss.Sort(item.Key, item.Ascending) - } - } - var resp *elastic.SearchResult - ctx, cancel := context.WithTimeout(it.ctx, it.timeout) - defer cancel() - resp, it.err = ss.Do(ctx) - if it.err != nil { - return it.err - } - if resp == nil || resp.Hits == nil || len(resp.Hits.Hits) <= 0 { - it.err = io.EOF - return it.err + searchSource, err := it.search() + if err != nil { + it.err = err + continue + } + ss := it.client.Search(it.indices...).IgnoreUnavailable(true).AllowNoIndices(true).Timeout(it.timeoutMS). + SearchSource(searchSource).From(it.fromOffset).Size(it.pageSize).SearchAfter(it.lastSortValues...) + if reverse { + for _, item := range it.sorts { + ss = ss.Sort(item.Key, !item.Ascending) } - atomic.SwapInt64(&it.total, resp.TotalHits()) - it.totalCached = true - it.buffer = it.parseHits(resp.Hits.Hits) - if len(resp.Hits.Hits) < it.pageSize { - it.err = io.EOF - return it.err + } else { + for _, item := range it.sorts { + ss = ss.Sort(item.Key, item.Ascending) } - return nil - }() + } + resp, err := it.doRequest(ss) + if err != nil { + it.err = err + continue + } + + if resp == nil || resp.Hits == nil || len(resp.Hits.Hits) <= 0 { + it.err = io.EOF + continue + } + atomic.SwapInt64(&it.total, resp.TotalHits()) + it.totalCached = true + it.buffer = it.parseHits(resp.Hits.Hits) + if len(resp.Hits.Hits) < it.pageSize { + it.err = io.EOF + continue + } } - return nil +} + +func (it *searchIterator) doRequest(ss *elastic.SearchService) (*elastic.SearchResult, error) { + ctx, cancel := context.WithTimeout(it.ctx, it.timeout) + defer cancel() + return ss.Do(ctx) } func (it *searchIterator) count() error { diff --git a/modules/core/monitor/storekit/ratelimit.go b/modules/core/monitor/storekit/ratelimit.go new file mode 100644 index 00000000000..9fc861a465f --- /dev/null +++ b/modules/core/monitor/storekit/ratelimit.go @@ -0,0 +1,59 @@ +// Copyright (c) 2021 Terminus, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storekit + +import ( + "time" + + "golang.org/x/time/rate" +) + +type RateLimiter interface { + // pass n token to limiter, return the duration you must wait + ReserveN(n int) time.Duration +} + +type RateLimitConfig struct { + Duration time.Duration `file:"duration"` + Limit int `file:"limit"` +} + +type InMemoryRateLimiter struct { + limiter *rate.Limiter +} + +func NewInMemoryRateLimiter(cfg RateLimitConfig) *InMemoryRateLimiter { + limit := cfg.Limit / int(cfg.Duration.Seconds()) + return &InMemoryRateLimiter{limiter: rate.NewLimiter(rate.Limit(limit), limit)} +} + +func (im *InMemoryRateLimiter) ReserveN(n int) time.Duration { + now := time.Now() + r := im.limiter.ReserveN(now, n) + // if n > burst, then split n to multi b and get the total delay + if !r.OK() { + b, d := im.limiter.Burst(), time.Duration(0) + for n > 0 { + tmp := im.limiter.ReserveN(now, b) + if !tmp.OK() { + panic("sub reserve is not OK") + } + d += tmp.Delay() + n -= b + } + return d + } + return r.Delay() +} diff --git a/modules/core/monitor/storekit/ratelimit_test.go b/modules/core/monitor/storekit/ratelimit_test.go new file mode 100644 index 00000000000..a90e8f83f46 --- /dev/null +++ b/modules/core/monitor/storekit/ratelimit_test.go @@ -0,0 +1,51 @@ +// Copyright (c) 2021 Terminus, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storekit + +import ( + "math" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestInMemoryRateLimiter_ReserveN(t *testing.T) { + limit := 10 + im := NewInMemoryRateLimiter(RateLimitConfig{ + Duration: time.Second, + Limit: limit, + }) + + ass := assert.New(t) + + d := roundDelay(im.ReserveN(2 * limit)) + ass.Equal(time.Second, d) + time.Sleep(d) // wait delay + time.Sleep(time.Second) // wait token full + + d = roundDelay(im.ReserveN(3 * limit)) + ass.Equal(time.Second*3, d) + time.Sleep(d) + time.Sleep(time.Second) // wait token full + + d = roundDelay(im.ReserveN(limit)) + ass.Equal(time.Duration(0), d) +} + +// ignore small timing issue +func roundDelay(d time.Duration) time.Duration { + return time.Second * time.Duration(math.Round(float64(d)/float64(time.Second))) +}