diff --git a/.erda/migrations/monitor/20210813-monitor-project-dashboard.sql b/.erda/migrations/monitor/20210813-monitor-project-dashboard.sql
new file mode 100644
index 00000000000..8bbc5173dc5
--- /dev/null
+++ b/.erda/migrations/monitor/20210813-monitor-project-dashboard.sql
@@ -0,0 +1,3 @@
+INSERT INTO `sp_dashboard_block_system` (`id`, `name`, `desc`, `domain`, `scope`, `scope_id`, `view_config`, `data_config`, `version`) VALUES ('working_en', 'working_ statistics', '', '', 'org', 'terminus', '[{\"w\":12,\"h\":9,\"x\":0,\"y\":0,\"i\":\"view-98ae03fd\",\"view\":{\"title\":\"actual workload (MD)\",\"description\":\"\",\"chartType\":\"card\",\"dataSourceType\":\"api\",\"staticData\":{},\"config\":{\"optionProps\":{\"isMoreThanOneDay\":false}},\"api\":{\"url\":\"/api/project/metrics/issue_metrics_statistics\",\"query\":{\"alias_sum.issue_elapsed_time\":\"issue_elapsed_time总和\",\"chartType\":\"card\",\"format\":\"chartv2\",\"sum\":[\"issue_elapsed_time\"]},\"body\":null,\"header\":null,\"extraData\":{\"activedMetricGroups\":[\"other\",\"other@issue_metrics_statistics\"],\"activedMetrics\":[{\"aggregation\":\"sum\",\"key\":\"5\",\"metric\":\"issue_metrics_statistics-issue_elapsed_time\"}],\"filters\":[],\"group\":[]}}}},{\"w\":12,\"h\":9,\"x\":12,\"y\":0,\"i\":\"view-73d60bcb\",\"view\":{\"title\":\"estimated workload (MD)\",\"description\":\"\",\"chartType\":\"card\",\"dataSourceType\":\"api\",\"staticData\":{},\"config\":{\"optionProps\":{\"isMoreThanOneDay\":false}},\"api\":{\"url\":\"/api/project/metrics/issue_metrics_statistics\",\"query\":{\"alias_sum.issue_estimate_time\":\"issue_estimate_time总和\",\"chartType\":\"card\",\"format\":\"chartv2\",\"sum\":[\"issue_estimate_time\"]},\"body\":null,\"header\":null,\"extraData\":{\"activedMetricGroups\":[\"other\",\"other@issue_metrics_statistics\"],\"activedMetrics\":[{\"aggregation\":\"sum\",\"key\":\"16\",\"metric\":\"issue_metrics_statistics-issue_estimate_time\"}],\"filters\":[],\"group\":[]}}}},{\"w\":24,\"h\":9,\"x\":0,\"y\":9,\"i\":\"view-cdb72329\",\"view\":{\"title\":\"member workload distribution (MD)\",\"description\":\"\",\"chartType\":\"chart:bar\",\"dataSourceType\":\"api\",\"staticData\":{},\"config\":{\"optionProps\":{\"isMoreThanOneDay\":false},\"option\":{\"yAxis\":[{\"name\":\"\"}]}},\"api\":{\"url\":\"/api/project/metrics/issue_metrics_statistics/histogram\",\"query\":{\"alias_last.tags.issue_user_nick\":\"issue_user_nick\",\"alias_sum.issue_elapsed_time\":\"issue_elapsed_time总和\",\"alias_sum.issue_estimate_time\":\"issue_estimate_time总和\",\"chartType\":\"chart:bar\",\"format\":\"chartv2\",\"limit\":1000,\"group\":\"(tags.issue_user_nick)\",\"sum\":[\"issue_estimate_time\",\"issue_elapsed_time\"]},\"body\":null,\"header\":null,\"extraData\":{\"activedMetricGroups\":[\"other\",\"other@issue_metrics_statistics\"],\"activedMetrics\":[{\"aggregation\":\"sum\",\"key\":\"50\",\"metric\":\"issue_metrics_statistics-issue_estimate_time\"},{\"aggregation\":\"sum\",\"key\":\"40\",\"metric\":\"issue_metrics_statistics-issue_elapsed_time\"}],\"filters\":[],\"group\":[\"tags.issue_user_nick\"]}}}},{\"w\":12,\"h\":9,\"x\":0,\"y\":28,\"i\":\"view-2161e559\",\"view\":{\"title\":\"BUG time consuming TOP10 (MD)\",\"description\":\"\",\"chartType\":\"chart:bar\",\"dataSourceType\":\"api\",\"staticData\":{},\"config\":{\"optionProps\":{\"isMoreThanOneDay\":false},\"option\":{\"yAxis\":[{\"name\":\"\"}]}},\"api\":{\"url\":\"/api/project/metrics/issue_metrics_statistics/histogram\",\"query\":{\"alias_last.tags.issue_title\":\"issue_title\",\"alias_sum.issue_elapsed_time\":\"issue_elapsed_time总和\",\"chartType\":\"chart:bar\",\"eq_tags.issue_type\":\"BUG\",\"format\":\"chartv2\",\"group\":\"(tags.issue_title)\",\"limit\":10,\"sort\":\"sum_issue_elapsed_time\",\"sum\":[\"issue_elapsed_time\"]},\"body\":null,\"header\":null,\"extraData\":{\"activedMetricGroups\":[\"other\",\"other@issue_metrics_statistics\"],\"activedMetrics\":[{\"aggregation\":\"sum\",\"key\":\"235\",\"metric\":\"issue_metrics_statistics-issue_elapsed_time\"}],\"filters\":[{\"key\":\"245\",\"method\":\"eq\",\"tag\":\"tags.issue_type\",\"value\":\"BUG\"}],\"group\":[\"tags.issue_title\"],\"limit\":10}}}},{\"w\":12,\"h\":9,\"x\":12,\"y\":28,\"i\":\"view-1e55a4eb\",\"view\":{\"title\":\"task time consuming TOP10 (MD)\",\"description\":\"\",\"chartType\":\"chart:bar\",\"dataSourceType\":\"api\",\"staticData\":{},\"config\":{\"optionProps\":{\"isMoreThanOneDay\":false},\"option\":{\"yAxis\":[{\"name\":\"\"}]}},\"api\":{\"url\":\"/api/project/metrics/issue_metrics_statistics/histogram\",\"query\":{\"alias_last.tags.issue_title\":\"issue_title\",\"alias_sum.issue_elapsed_time\":\"issue_elapsed_time总和\",\"chartType\":\"chart:bar\",\"eq_tags.issue_type\":\"TASK\",\"format\":\"chartv2\",\"group\":\"(tags.issue_title)\",\"limit\":10,\"sort\":\"sum_issue_elapsed_time\",\"sum\":[\"issue_elapsed_time\"]},\"body\":null,\"header\":null,\"extraData\":{\"activedMetricGroups\":[\"other\",\"other@issue_metrics_statistics\"],\"activedMetrics\":[{\"aggregation\":\"sum\",\"key\":\"347\",\"metric\":\"issue_metrics_statistics-issue_elapsed_time\"}],\"filters\":[{\"key\":\"357\",\"method\":\"eq\",\"tag\":\"tags.issue_type\",\"value\":\"TASK\"}],\"group\":[\"tags.issue_title\"],\"limit\":10}}}},{\"w\":24,\"h\":10,\"x\":0,\"y\":18,\"i\":\"view-970a9935\",\"view\":{\"title\":\"Personnel incident distribution (num)\",\"description\":\"\",\"chartType\":\"chart:bar\",\"dataSourceType\":\"api\",\"staticData\":{},\"config\":{\"optionProps\":{\"isMoreThanOneDay\":false},\"option\":{\"yAxis\":[{\"name\":\"\"}]}},\"api\":{\"url\":\"/api/project/metrics/issue_metrics_statistics/histogram\",\"query\":{\"alias_last.tags.issue_user_nick\":\"issue_user_nick\",\"alias_sum.issue_type_BUG\":\"issue_type_BUG总和\",\"alias_sum.issue_type_TASK\":\"issue_type_TASK总和\",\"chartType\":\"chart:bar\",\"format\":\"chartv2\",\"limit\":1000,\"group\":\"(tags.issue_user_nick)\",\"sum\":[\"issue_type_TASK\",\"issue_type_BUG\"]},\"body\":null,\"header\":null,\"extraData\":{\"activedMetricGroups\":[\"other\",\"other@issue_metrics_statistics\"],\"activedMetrics\":[{\"aggregation\":\"sum\",\"key\":\"659\",\"metric\":\"issue_metrics_statistics-issue_type_TASK\"},{\"aggregation\":\"sum\",\"key\":\"649\",\"metric\":\"issue_metrics_statistics-issue_type_BUG\"}],\"filters\":[],\"group\":[\"tags.issue_user_nick\"]}}}}]', '[]', '');
+INSERT INTO `sp_dashboard_block_system` (`id`, `name`, `desc`, `domain`, `scope`, `scope_id`, `view_config`, `data_config`, `version`) VALUES ('bug_en', 'bug_statistics', '', '', 'org', 'terminus', '[{\"w\":12,\"h\":10,\"x\":0,\"y\":0,\"i\":\"view-f6623ce7\",\"view\":{\"title\":\"bug are distributed by state\",\"description\":\"\",\"chartType\":\"chart:pie\",\"dataSourceType\":\"api\",\"staticData\":{},\"config\":{\"optionProps\":{\"isMoreThanOneDay\":false}},\"api\":{\"url\":\"/api/project/metrics/issue_metrics_statistics\",\"query\":{\"alias_last.tags.issue_state\":\"issue_state\",\"alias_sum.counts\":\"counts总和\",\"chartType\":\"chart:pie\",\"eq_tags.issue_type\":\"BUG\",\"format\":\"chartv2\",\"trans_group\":\"true\",\"group\":\"(tags.issue_state)\",\"sum\":[\"counts\"]},\"body\":null,\"header\":null,\"extraData\":{\"activedMetricGroups\":[\"other\",\"other@issue_metrics_statistics\"],\"activedMetrics\":[{\"aggregation\":\"sum\",\"key\":\"691\",\"metric\":\"issue_metrics_statistics-counts\"}],\"filters\":[{\"key\":\"9\",\"method\":\"eq\",\"tag\":\"tags.issue_type\",\"value\":\"BUG\"}],\"group\":[\"tags.issue_state\"]}},\"controls\":null}},{\"w\":24,\"h\":9,\"x\":0,\"y\":10,\"i\":\"view-fd2ecfbe\",\"view\":{\"title\":\"bug add/Close trend\",\"description\":\"\",\"chartType\":\"chart:line\",\"dataSourceType\":\"api\",\"staticData\":{},\"config\":{\"optionProps\":{\"isMoreThanOneDay\":true,\"noAreaColor\":true,\"moreThanOneDayFormat\":\"M/D\"},\"option\":{\"yAxis\":[{\"name\":\"\"}]}},\"api\":{\"url\":\"/api/project/metrics/issue_add_or_repair_metrics_statistics/histogram\",\"query\":{\"chartType\":\"chart:line\",\"interval\":\"24h\",\"format\":\"chartv2\",\"start\":\"${start}\",\"end\":\"${end}\",\"sum\":[\"bug_add_counts\",\"bug_repair_counts\"]},\"body\":null,\"header\":null,\"extraData\":{\"activedMetricGroups\":[\"other\",\"other@issue_add_or_repair_metrics_statistics\"],\"activedMetrics\":[{\"aggregation\":\"sum\",\"key\":\"263\",\"metric\":\"issue_add_or_repair_metrics_statistics-bug_add_or_repair_counts\"}],\"filters\":[],\"group\":[\"tags.bug_add_or_repair_type\"]}},\"controls\":[{}]}},{\"w\":12,\"h\":10,\"x\":12,\"y\":0,\"i\":\"view-1e86c4a6\",\"view\":{\"title\":\"bug Distribution by severity level\",\"description\":\"\",\"chartType\":\"chart:pie\",\"dataSourceType\":\"api\",\"staticData\":{},\"config\":{\"optionProps\":{\"isMoreThanOneDay\":false}},\"api\":{\"url\":\"/api/project/metrics/issue_metrics_statistics\",\"query\":{\"alias_last.tags.issue_severity\":\"issue_severity\",\"alias_sum.counts\":\"counts总和\",\"chartType\":\"chart:pie\",\"eq_tags.issue_type\":\"BUG\",\"format\":\"chartv2\",\"trans_group\":\"true\",\"group\":\"(tags.issue_severity)\",\"sum\":[\"counts\"]},\"body\":null,\"header\":null,\"extraData\":{\"activedMetricGroups\":[\"other\",\"other@issue_metrics_statistics\"],\"activedMetrics\":[{\"aggregation\":\"sum\",\"key\":\"459\",\"metric\":\"issue_metrics_statistics-counts\"}],\"filters\":[{\"key\":\"24\",\"method\":\"eq\",\"tag\":\"tags.issue_type\",\"value\":\"BUG\"}],\"group\":[\"tags.issue_severity\"]}},\"controls\":null}},{\"w\":12,\"h\":5,\"x\":12,\"y\":37,\"i\":\"view-91fd7d1f\",\"view\":{\"title\":\"bug defect response time (MD)\",\"description\":\"\",\"chartType\":\"card\",\"dataSourceType\":\"api\",\"staticData\":{},\"config\":{\"optionProps\":{\"isMoreThanOneDay\":false}},\"api\":{\"url\":\"/api/project/metrics/issue_metrics_statistics\",\"query\":{\"alias_avg.issue_response_time\":\"issue_response_time平均值\",\"avg\":[\"issue_response_time\"],\"chartType\":\"card\",\"eq_tags.issue_state\":\"CLOSED\",\"eq_tags.issue_type\":\"BUG\",\"format\":\"chartv2\"},\"body\":null,\"header\":null,\"extraData\":{\"activedMetricGroups\":[\"other\",\"other@issue_metrics_statistics\"],\"activedMetrics\":[{\"aggregation\":\"avg\",\"key\":\"604\",\"metric\":\"issue_metrics_statistics-issue_response_time\"}],\"filters\":[{\"key\":\"346\",\"method\":\"eq\",\"tag\":\"tags.issue_state\",\"value\":\"CLOSED\"},{\"key\":\"84\",\"method\":\"eq\",\"tag\":\"tags.issue_type\",\"value\":\"BUG\"}],\"group\":[]}},\"controls\":null}},{\"w\":12,\"h\":5,\"x\":0,\"y\":37,\"i\":\"view-d8d99372\",\"view\":{\"title\":\"bug average repair workload(MD)\",\"description\":\"\",\"chartType\":\"card\",\"dataSourceType\":\"api\",\"staticData\":{},\"config\":{\"optionProps\":{\"isMoreThanOneDay\":false}},\"api\":{\"url\":\"/api/project/metrics/issue_metrics_statistics\",\"query\":{\"alias_avg.issue_elapsed_time\":\"issue_elapsed_time平均值\",\"avg\":[\"issue_elapsed_time\"],\"chartType\":\"card\",\"eq_tags.issue_state\":\"CLOSED\",\"eq_tags.issue_type\":\"BUG\",\"format\":\"chartv2\"},\"body\":null,\"header\":null,\"extraData\":{\"activedMetricGroups\":[\"other\",\"other@issue_metrics_statistics\"],\"activedMetrics\":[{\"aggregation\":\"avg\",\"key\":\"308\",\"metric\":\"issue_metrics_statistics-issue_elapsed_time\"}],\"filters\":[{\"key\":\"269\",\"method\":\"eq\",\"tag\":\"tags.issue_state\",\"value\":\"CLOSED\"},{\"key\":\"191\",\"method\":\"eq\",\"tag\":\"tags.issue_type\",\"value\":\"BUG\"}],\"group\":[]}},\"controls\":null}},{\"w\":24,\"h\":10,\"x\":0,\"y\":42,\"i\":\"view-8b1ee412\",\"view\":{\"title\":\"bug distribution by priority\",\"description\":\"\",\"chartType\":\"chart:bar\",\"dataSourceType\":\"api\",\"staticData\":{},\"config\":{\"optionProps\":{\"isMoreThanOneDay\":false},\"option\":{\"yAxis\":[{\"name\":\"\"}]}},\"api\":{\"url\":\"/api/project/metrics/issue_metrics_statistics/histogram\",\"query\":{\"alias_last.tags.issue_user_nick\":\"issue_user_nick\",\"alias_sum.counts\":\"counts总和\",\"alias_sum.not_close\":\"not_close总和\",\"chartType\":\"chart:bar\",\"eq_tags.issue_type\":\"BUG\",\"format\":\"chartv2\",\"limit\":1000,\"group\":\"(tags.issue_user_nick)\",\"sum\":[\"not_close\",\"counts\"]},\"body\":null,\"header\":null,\"extraData\":{\"activedMetricGroups\":[\"other\",\"other@issue_metrics_statistics\"],\"activedMetrics\":[{\"aggregation\":\"sum\",\"key\":\"170\",\"metric\":\"issue_metrics_statistics-not_close\"},{\"aggregation\":\"sum\",\"key\":\"154\",\"metric\":\"issue_metrics_statistics-counts\"}],\"filters\":[{\"key\":\"269\",\"method\":\"eq\",\"tag\":\"tags.issue_type\",\"value\":\"BUG\"}],\"group\":[\"tags.issue_user_nick\"]}},\"controls\":[{\"key\":\"eq_tags.issue_priority\",\"options\":[{\"name\":\"low\",\"value\":\"LOW\"},{\"name\":\"normal\",\"value\":\"NORMAL\"},{\"name\":\"high\",\"value\":\"HIGH\"},{\"name\":\"urgent\",\"value\":\"URGENT\"}],\"type\":\"select\"}]}},{\"w\":24,\"h\":9,\"x\":0,\"y\":28,\"i\":\"view-a844c5b7\",\"view\":{\"title\":\"bug press to reopen the distribution\",\"description\":\"\",\"chartType\":\"chart:bar\",\"dataSourceType\":\"api\",\"staticData\":{},\"config\":{\"optionProps\":{\"isMoreThanOneDay\":false},\"option\":{\"yAxis\":[{\"name\":\"\"}]}},\"api\":{\"url\":\"/api/project/metrics/issue_metrics_statistics/histogram\",\"query\":{\"alias_last.tags.issue_user_nick\":\"issue_user_nick\",\"alias_sum.is_re_open_FATAL\":\"is_re_open_FATAL总和\",\"alias_sum.is_re_open_NORMAL\":\"is_re_open_NORMAL总和\",\"alias_sum.is_re_open_SERIOUS\":\"is_re_open_SERIOUS总和\",\"alias_sum.is_re_open_SLIGHT\":\"is_re_open_SLIGHT总和\",\"alias_sum.is_re_open_SUGGEST\":\"is_re_open_SUGGEST总和\",\"chartType\":\"chart:bar\",\"eq_tags.issue_type\":\"BUG\",\"format\":\"chartv2\",\"limit\":1000,\"group\":\"(tags.issue_user_nick)\",\"sum\":[\"is_re_open_SUGGEST\",\"is_re_open_SLIGHT\",\"is_re_open_SERIOUS\",\"is_re_open_NORMAL\",\"is_re_open_FATAL\"]},\"body\":null,\"header\":null,\"extraData\":{\"activedMetricGroups\":[\"other\",\"other@issue_metrics_statistics\"],\"activedMetrics\":[{\"aggregation\":\"sum\",\"key\":\"1755\",\"metric\":\"issue_metrics_statistics-is_re_open_SUGGEST\"},{\"aggregation\":\"sum\",\"key\":\"1721\",\"metric\":\"issue_metrics_statistics-is_re_open_SLIGHT\"},{\"aggregation\":\"sum\",\"key\":\"1687\",\"metric\":\"issue_metrics_statistics-is_re_open_SERIOUS\"},{\"aggregation\":\"sum\",\"key\":\"1653\",\"metric\":\"issue_metrics_statistics-is_re_open_NORMAL\"},{\"aggregation\":\"sum\",\"key\":\"1637\",\"metric\":\"issue_metrics_statistics-is_re_open_FATAL\"}],\"filters\":[{\"key\":\"1871\",\"method\":\"eq\",\"tag\":\"tags.issue_type\",\"value\":\"BUG\"}],\"group\":[\"tags.issue_user_nick\"]}},\"controls\":null}},{\"w\":24,\"h\":9,\"x\":0,\"y\":19,\"i\":\"view-7fb8a372\",\"view\":{\"title\":\"bug distribution by personnel\",\"description\":\"\",\"chartType\":\"chart:bar\",\"dataSourceType\":\"api\",\"staticData\":{},\"config\":{\"optionProps\":{\"isMoreThanOneDay\":false},\"option\":{\"yAxis\":[{\"name\":\"\"}]}},\"api\":{\"url\":\"/api/project/metrics/issue_metrics_statistics/histogram\",\"query\":{\"alias_last.tags.issue_user_nick\":\"issue_user_nick\",\"alias_sum.counts\":\"counts总和\",\"alias_sum.not_close\":\"not_close总和\",\"chartType\":\"chart:bar\",\"eq_tags.issue_type\":\"BUG\",\"format\":\"chartv2\",\"limit\":1000,\"group\":\"(tags.issue_user_nick)\",\"sum\":[\"not_close\",\"counts\"]},\"body\":null,\"header\":null,\"extraData\":{\"activedMetricGroups\":[\"other\",\"other@issue_metrics_statistics\"],\"activedMetrics\":[{\"aggregation\":\"sum\",\"key\":\"645\",\"metric\":\"issue_metrics_statistics-not_close\"},{\"aggregation\":\"sum\",\"key\":\"635\",\"metric\":\"issue_metrics_statistics-counts\"}],\"filters\":[{\"key\":\"667\",\"method\":\"eq\",\"tag\":\"tags.issue_type\",\"value\":\"BUG\"}],\"group\":[\"tags.issue_user_nick\"]}},\"controls\":[{\"key\":\"eq_tags.issue_severity\",\"options\":[{\"name\":\"suggest\",\"value\":\"SUGGEST\"},{\"name\":\"slight\",\"value\":\"SLIGHT\"},{\"name\":\"serious\",\"value\":\"SERIOUS\"},{\"name\":\"fatal\",\"value\":\"FATAL\"},{\"name\":\"normal\",\"value\":\"NORMAL\"}],\"type\":\"select\"}]}}]', '[]', '');
+
diff --git a/go.mod b/go.mod
index 6a34a815834..956dfc2f745 100644
--- a/go.mod
+++ b/go.mod
@@ -23,6 +23,7 @@ require (
github.com/buger/jsonparser v1.1.1
github.com/c2h5oh/datasize v0.0.0-20200112174442-28bbd4740fee
github.com/caarlos0/env v0.0.0-20180521112546-3e0f30cbf50b
+ github.com/cespare/xxhash v1.1.0
github.com/confluentinc/confluent-kafka-go v1.5.2
github.com/containerd/console v1.0.2
github.com/coreos/etcd v3.3.25+incompatible
@@ -152,6 +153,7 @@ require (
kmodules.xyz/monitoring-agent-api v0.0.0-20200125202117-d3b3e33ce41f
kmodules.xyz/objectstore-api v0.0.0-20200214040336-fe8f39a4210d
kmodules.xyz/offshoot-api v0.0.0-20200216080509-45ee6418d1c1
+ modernc.org/mathutil v1.0.0
moul.io/http2curl v1.0.0 // indirect
rsc.io/letsencrypt v0.0.3 // indirect
sigs.k8s.io/controller-runtime v0.9.2
diff --git a/go.sum b/go.sum
index 281c430dafa..1a309521784 100644
--- a/go.sum
+++ b/go.sum
@@ -115,6 +115,7 @@ github.com/Microsoft/hcsshim v0.8.14 h1:lbPVK25c1cu5xTLITwpUcxoA9vKrKErASPYygvou
github.com/Microsoft/hcsshim v0.8.14/go.mod h1:NtVKoYxQuTLx6gEq0L96c9Ju4JbRJ4nY2ow3VK6a9Lg=
github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ=
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
+github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/PuerkitoBio/goquery v1.5.1/go.mod h1:GsLWisAFVj4WgDibEWF4pvYnkVQBpKBKeU+7zCJoLcc=
github.com/PuerkitoBio/purell v1.0.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
@@ -2397,6 +2398,7 @@ kmodules.xyz/offshoot-api v0.0.0-20200216080509-45ee6418d1c1 h1:4ZXeBIppt2MlDPrF
kmodules.xyz/offshoot-api v0.0.0-20200216080509-45ee6418d1c1/go.mod h1:k1M+OOFHbMeS6N6OB2j9smKfMCyQOEQjZ3SLZ/KSe+w=
modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw=
modernc.org/golex v1.0.0/go.mod h1:b/QX9oBD/LhixY6NDh+IdGv17hgB+51fET1i2kPSmvk=
+modernc.org/mathutil v1.0.0 h1:93vKjrJopTPrtTNpZ8XIovER7iCIH1QU7wNbOQXC60I=
modernc.org/mathutil v1.0.0/go.mod h1:wU0vUrJsVWBZ4P6e7xtFJEhFSNsfRLJ8H458uRjg03k=
modernc.org/strutil v1.0.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs=
modernc.org/xc v1.0.0/go.mod h1:mRNCo0bvLjGhHO9WsyuKVU4q0ceiDDDoEeWDJHrNx8I=
diff --git a/modules/cmp/cache/README.md b/modules/cmp/cache/README.md
new file mode 100644
index 00000000000..217595eb018
--- /dev/null
+++ b/modules/cmp/cache/README.md
@@ -0,0 +1,48 @@
+CmpCache
+------------
+
+cache implement lru limited by memory usage. keys in CmpCache sorted by timestamp and lazy remove.
+
+- [`entry`](#entry)
+ store value of cache data as Value type. data only support `Int` | `String` | `Float` | `Bool` | `UnsignedValue`
+
+- `pair`
+ contains key , value ,overdue timestamp
+
+- `segment`
+ cache contains 256 segments.
+ key hash to uint64 and assigned to specific segment
+
+- `store`
+ contains segments , locks of each segment ,and a temporary slice for hash
+
+
+- benchmark.
+
+
+| method-duration | ns/op |
+| ------------ | ---- |
+| BenchmarkLRU_Rand | 306 |
+| BenchmarkLRU_Freq | 278 |
+| BenchmarkLRU_FreqParallel-8 | 148 |
+
+
+- Interface
+
+ - ``Remove(key string) error``
+
+ - ``WriteMulti(pairs map[string]Values) error``
+
+ - ``Write(key string, value Values,overdueTimeStamp int64) error``
+
+ Add key value pair in cache.
+ - ``IncreaseSize(size int64)``
+
+ Increase capacity of memory thar cache could use
+ - ``DecrementSize(size int64) error``
+
+ Decrease capacity of memory thar cache could use
+ - ``Get(key string) (Values,bool, error)``
+
+ Return value that cache stored . The second return value is key expired or not.
+
diff --git a/modules/cmp/cache/cache.go b/modules/cmp/cache/cache.go
new file mode 100644
index 00000000000..ff06baac323
--- /dev/null
+++ b/modules/cmp/cache/cache.go
@@ -0,0 +1,575 @@
+// Copyright (c) 2021 Terminus, Inc.
+//
+// This program is free software: you can use, redistribute, and/or modify
+// it under the terms of the GNU Affero General Public License, version 3
+// or later ("AGPL"), as published by the Free Software Foundation.
+//
+// This program is distributed in the hope that it will be useful, but WITHOUT
+// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+// FITNESS FOR A PARTICULAR PURPOSE.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package cache
+
+import (
+ "container/heap"
+ "encoding/json"
+ "fmt"
+ "sort"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/cespare/xxhash"
+ "github.com/pkg/errors"
+ "github.com/sirupsen/logrus"
+ "modernc.org/mathutil"
+)
+
+type (
+ Values []Value
+ EntryType int
+)
+
+const (
+ UndefinedType EntryType = iota
+ IntType
+ BoolType
+ FloatType
+ StringType
+ UnsignedType
+ ByteType
+ ByteSliceType
+
+ minCacheSize = 1 << 18
+)
+
+var (
+ EvictError = errors.New("cache memory evicted")
+ NilPtrError = errors.New("nil ptr error")
+ EntryTypeDifferentError = errors.New("entry and cacheValues type different")
+ WrongEntryKeyError = errors.New("entry key is wrong")
+ InitEntryFailedError = errors.New("entry initialized failed")
+ ValueNotFoundError = errors.New("cacheValues not found")
+ KeyNotFoundError = errors.New("key not found")
+ KeyTooLongError = errors.New("key too long")
+ ValueTypeNotFoundError = errors.New("cacheValues type not found")
+ IllegalCacheSize = errors.New("illegal cache size")
+)
+
+// Cache implement concurrent safe cache with LRU and ttl strategy.
+type Cache struct {
+ store *store
+ log *logrus.Logger
+ k *keyBuilder
+}
+
+type segment struct {
+ pairs []*pair
+ tmp *pair
+ length int
+ mapping map[string]int
+ maxSize int64
+ used int64
+ nextIdx int64
+}
+
+func newPairs(maxSize int64, segNum int) []*segment {
+ ps := make([]*segment, segNum)
+ pairLen := maxSize >> 12
+ for i := 0; i < segNum; i++ {
+ p := make([]*pair, mathutil.Min(int(pairLen), 1024))
+ for j := range p {
+ p[j] = &pair{}
+ }
+ ps[i] = &segment{
+ pairs: p,
+ tmp: &pair{},
+ length: 0,
+ mapping: map[string]int{},
+ maxSize: maxSize / int64(segNum),
+ used: 0,
+ }
+ }
+ return ps
+}
+
+type pair struct {
+ key string
+ value Values
+ entryType EntryType
+ overdueTimestamp int64
+ idx int64
+}
+
+// store implement LRU strategy
+type store struct {
+ segs []*segment
+ locks []*sync.RWMutex
+ log *logrus.Logger
+ key []byte
+}
+
+func (seg *segment) Len() int {
+ return seg.length
+}
+
+func (seg *segment) Less(i, j int) bool {
+ return seg.pairs[i].idx > seg.pairs[j].idx
+}
+
+func (seg *segment) Swap(i, j int) {
+ mj := seg.mapping[seg.pairs[j].key]
+ mi := seg.mapping[seg.pairs[i].key]
+ seg.mapping[seg.pairs[i].key] = mj
+ seg.mapping[seg.pairs[j].key] = mi
+ seg.pairs[i], seg.pairs[j] = seg.pairs[j], seg.pairs[i]
+}
+
+func (seg *segment) Push(x interface{}) {
+ return
+}
+
+func (seg *segment) Pop() interface{} {
+ return nil
+}
+
+func newLocks() []*sync.RWMutex {
+ ls := make([]*sync.RWMutex, 256)
+ for i := range ls {
+ ls[i] = &sync.RWMutex{}
+ }
+ return ls
+}
+
+func newStore(maxSize int64, segNum int, logger *logrus.Logger) *store {
+ return &store{
+ log: logger,
+ segs: newPairs(maxSize, segNum),
+ locks: newLocks(),
+ key: make([]byte, 1024),
+ }
+}
+
+func (s *store) write(id int) error {
+ ps := s.segs[id]
+ newPair := s.segs[id].tmp
+
+ needSize, _ := newPair.getEntrySize()
+ if ps.maxSize < needSize {
+ s.log.Errorf("evict cache size,try next")
+ return EvictError
+ }
+ usage := ps.used
+ for ps.maxSize-usage < needSize {
+ p := ps.pairs[0]
+ s.remove(id, p.key)
+ entrySize, _ := p.getEntrySize()
+ usage -= entrySize
+ //s.log.Warnf("memory not sufficient ,%v has poped", p.key)
+ }
+ usage += needSize
+ idx := ps.Len()
+ ps.pairs[idx].key = newPair.key
+ ps.pairs[idx].value = newPair.value
+ ps.pairs[idx].overdueTimestamp = newPair.overdueTimestamp
+ ps.mapping[newPair.key] = idx
+ heap.Push(ps, ps.pairs[idx])
+ ps.length++
+ ps.used = usage
+ return nil
+}
+
+func (s *store) remove(id int, key string) (*pair, error) {
+ var (
+ idx int
+ ok bool
+ )
+ ps := s.segs[id]
+ if idx, ok = ps.mapping[key]; !ok {
+ return nil, ValueNotFoundError
+ }
+ heap.Remove(ps, idx)
+ ps.length--
+ p := ps.pairs[ps.Len()]
+ cacheSize, _ := p.getEntrySize()
+ delete(ps.mapping, p.key)
+ ps.used -= cacheSize
+ return p, nil
+}
+
+type CmpCache interface {
+ Remove(key string) (Values, error)
+ Set(key string, value Values, overdueTimeStamp int64) error
+ IncreaseSize(size int64)
+ DecrementSize(size int64) error
+ Get(key string) (Values, bool, error)
+}
+
+// updatePair update pair
+func (seg *segment) updatePair(key string, newValues Values, overdueTimestamp int64) error {
+ length := len(newValues)
+ if length == 0 {
+ return InitEntryFailedError
+ }
+ seg.tmp.overdueTimestamp = overdueTimestamp
+ seg.tmp.value = newValues
+ seg.tmp.key = key
+ seg.tmp.idx = seg.nextIdx
+ seg.nextIdx++
+ return nil
+}
+
+// getEntrySize returns total size of values.
+func (p *pair) getEntrySize() (int64, error) {
+ if p.value == nil {
+ return 0, NilPtrError
+ }
+ var usage = int64(0)
+ for _, v := range p.value {
+ usage += v.Size()
+ }
+ return usage, nil
+}
+
+type Value interface {
+ // String returns string
+ String() string
+ // Type returns type of value
+ Type() EntryType
+ // Size returns size of value
+ Size() int64
+ // Value returns any type
+ Value() interface{}
+}
+
+type ByteValue struct {
+ value byte
+}
+
+func (b ByteValue) String() string {
+ return string(b.value)
+}
+
+func (b ByteValue) Type() EntryType {
+ return ByteType
+}
+
+func (b ByteValue) Size() int64 {
+ return 9
+}
+
+func (b ByteValue) Value() interface{} {
+ return b.value
+}
+
+type ByteSliceValue struct {
+ value []byte
+}
+
+func (b ByteSliceValue) String() string {
+ return string(b.value)
+}
+
+func (b ByteSliceValue) Type() EntryType {
+ return ByteSliceType
+}
+
+func (b ByteSliceValue) Size() int64 {
+ return int64(len(b.value))
+}
+
+func (b ByteSliceValue) Value() interface{} {
+ return b.value
+}
+
+type FloatValue struct {
+ value float64
+}
+
+func (f FloatValue) Size() int64 {
+ return 16
+}
+
+func (f FloatValue) String() string {
+ return fmt.Sprintf("%f", f.value)
+}
+
+func (f FloatValue) Type() EntryType {
+ return FloatType
+}
+
+func (f FloatValue) Value() interface{} {
+ return f.value
+}
+
+type IntValue struct {
+ value int64
+}
+
+func (i IntValue) Size() int64 {
+ return 16
+}
+
+func (i IntValue) String() string {
+ return fmt.Sprintf("%v", i.value)
+}
+
+func (i IntValue) Type() EntryType {
+ return IntType
+}
+
+func (i IntValue) Value() interface{} {
+ return i.value
+}
+
+type StringValue struct {
+ value string
+}
+
+func (s StringValue) Size() int64 {
+ return 8 + int64(len(s.value))
+}
+
+func (s StringValue) String() string {
+ return s.value
+}
+
+func (s StringValue) Type() EntryType {
+ return StringType
+}
+
+func (s StringValue) Value() interface{} {
+ return s.value
+}
+
+type UnsignedValue struct {
+ value uint64
+}
+
+func (u UnsignedValue) Size() int64 {
+ return 16
+}
+
+func (u UnsignedValue) String() string {
+ return fmt.Sprintf("%v", u.value)
+}
+
+func (u UnsignedValue) Type() EntryType {
+ return UnsignedType
+}
+
+func (u UnsignedValue) Value() interface{} {
+ return u.value
+}
+
+type BoolValue struct {
+ value bool
+}
+
+func (b BoolValue) Size() int64 {
+ return 9
+}
+
+func (b BoolValue) String() string {
+ if b.value {
+ return "true"
+ }
+ return "false"
+}
+
+func (b BoolValue) Type() EntryType {
+ return BoolType
+}
+
+func (b BoolValue) Value() interface{} {
+ return b.value
+}
+
+// New returns cache.
+// parma size means memory cache can use.
+func New(size int64, segNum int) *Cache {
+ log := logrus.New()
+ if size < minCacheSize {
+ log.Errorf("cache size too small, 256KB at least")
+ return nil
+ }
+ cache := &Cache{
+ store: newStore(size, segNum, log),
+ log: log,
+ k: newKeyBuilder(),
+ }
+ return cache
+}
+
+// Set write key, cacheValues, overdueTimestamp into cache.
+// whether update or add cache, remove is first.
+func (c *Cache) Set(key string, value Values, overdueTimestamp int64) error {
+ var err error
+ id, err := c.k.getKeyId(key)
+ if err != nil {
+ return err
+ }
+ seg := c.store.segs[id]
+ lock := c.store.locks[id]
+ lock.Lock()
+ defer lock.Unlock()
+ // 1. remove old cache
+ c.store.remove(id, key)
+ // 2. set data into segment tmp
+ err = seg.updatePair(key, value, time.Now().UnixNano()+overdueTimestamp)
+ if err != nil {
+ return err
+ }
+ // 3. add pair into segment
+ err = c.store.write(id)
+ //c.log.Infof("%v has add in cache", key)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+// Remove remove cache
+func (c *Cache) Remove(key string) (Values, error) {
+ keyId, err := c.k.getKeyId(key)
+ if err != nil {
+ return nil, err
+ }
+ lock := c.store.locks[keyId]
+ lock.Lock()
+ defer lock.Unlock()
+ remove, err := c.store.remove(keyId, key)
+ return remove.value, nil
+}
+
+// Len return overdueTimestamp of key in cache
+func (c *Cache) Len() int {
+ cnt := 0
+ for i, pair := range c.store.segs {
+ c.store.locks[i].RLock()
+ cnt += pair.length
+ c.store.locks[i].RUnlock()
+ }
+ return cnt
+}
+
+// Get returns cache from key whether key is expired.
+func (c *Cache) Get(key string) (Values, bool, error) {
+ var (
+ err error
+ keyId int
+ freshPair *pair
+ )
+ keyId, err = c.k.getKeyId(key)
+ if err != nil {
+ return nil, false, err
+ }
+ lock := c.store.locks[keyId]
+ seg := c.store.segs[keyId]
+ lock.Lock()
+ defer lock.Unlock()
+ if p, err := c.store.remove(keyId, key); err == nil {
+ freshPair = p
+ } else {
+ return nil, false, ValueNotFoundError
+ }
+ err = seg.updatePair(key, freshPair.value, freshPair.overdueTimestamp)
+ if err != nil {
+ return nil, false, err
+ }
+ err = c.store.write(keyId)
+ if err != nil {
+ return nil, false, err
+ }
+ if freshPair.overdueTimestamp < int64(time.Now().Nanosecond()) {
+ c.log.Warnf("%v has expired ", key)
+ return freshPair.value, true, nil
+ }
+ return freshPair.value, false, nil
+}
+
+// IncreaseSize add specific size of max size
+func (c *Cache) IncreaseSize(size int64) {
+}
+
+// DecrementSize reduce specific size of max size
+func (c *Cache) DecrementSize(size int64) error {
+ return nil
+}
+
+type keyBuilder struct {
+ b []byte
+ mtx *sync.Mutex
+}
+
+func newKeyBuilder() *keyBuilder {
+ return &keyBuilder{b: make([]byte, 1024), mtx: &sync.Mutex{}}
+}
+
+func (k *keyBuilder) getKeyId(str string) (int, error) {
+ if len(str) > 1024 {
+ return -1, KeyTooLongError
+ }
+ k.mtx.Lock()
+ defer k.mtx.Unlock()
+ for i := range str {
+ k.b[i] = str[i]
+ }
+ return int(xxhash.Sum64(k.b[:len(str)]) & 255), nil
+}
+
+func GenerateKey(keys []string) string {
+ sort.Slice(keys, func(i, j int) bool {
+ return strings.Compare(keys[i], keys[j]) > 0
+ })
+ return strings.Join(keys, "")
+}
+
+func MarshalValue(o interface{}) (Values, error) {
+ d, err := json.Marshal(o)
+ if err != nil {
+ return nil, err
+ }
+ return Values{ByteSliceValue{
+ value: d,
+ }}, nil
+}
+
+func GetByteValue(d byte) (Values, error) {
+ return Values{ByteValue{
+ value: d,
+ }}, nil
+}
+
+func GetIntValue(d int64) (Values, error) {
+ return Values{IntValue{
+ value: d,
+ }}, nil
+}
+
+func GetUnsignedValue(d uint64) (Values, error) {
+ return Values{UnsignedValue{
+ value: d,
+ }}, nil
+}
+
+func GetBoolValue(d bool) (Values, error) {
+ return Values{BoolValue{
+ value: d,
+ }}, nil
+}
+
+func GetByteSliceValue(d []byte) (Values, error) {
+ return Values{ByteSliceValue{
+ value: d,
+ }}, nil
+}
+
+func GetStringValue(d string) (Values, error) {
+ return Values{StringValue{
+ value: d,
+ }}, nil
+}
diff --git a/modules/cmp/cache/cache_test.go b/modules/cmp/cache/cache_test.go
new file mode 100644
index 00000000000..6bf5735fcfd
--- /dev/null
+++ b/modules/cmp/cache/cache_test.go
@@ -0,0 +1,482 @@
+// Copyright (c) 2021 Terminus, Inc.
+//
+// This program is free software: you can use, redistribute, and/or modify
+// it under the terms of the GNU Affero General Public License, version 3
+// or later ("AGPL"), as published by the Free Software Foundation.
+//
+// This program is distributed in the hope that it will be useful, but WITHOUT
+// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+// FITNESS FOR A PARTICULAR PURPOSE.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package cache
+
+import (
+ "fmt"
+ "math/rand"
+ "reflect"
+ "testing"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ "modernc.org/mathutil"
+)
+
+func TestCache_DecrementSize(t *testing.T) {
+ cache := New(1<<30, 256)
+ type args struct {
+ size int64
+ }
+ tests := []struct {
+ name string
+ args args
+ }{
+ {
+ name: "DecrementTest",
+ args: args{
+ 100,
+ },
+ },
+ {
+ name: "DecrementTest",
+
+ args: args{
+ 100,
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+
+ err := cache.DecrementSize(tt.args.size)
+ if err != nil {
+
+ }
+ })
+ }
+}
+
+func TestCache_Get(t *testing.T) {
+ cache := New(256*1024, 256)
+ type args struct {
+ key string
+ }
+ tests := []struct {
+ name string
+ args args
+ want Values
+ wantErr bool
+ }{
+ {"Get_Test",
+
+ args{"metrics1"},
+ Values{IntValue{1}},
+ false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ err := cache.Set("metrics1", Values{IntValue{
+ value: 1,
+ }}, int64(1))
+
+ got, _, err := cache.Get("metrics1")
+
+ if err != nil {
+ t.Errorf("Get() error = %v, wantErr %v", err, tt.wantErr)
+ }
+ if !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("Get() got = %v, want %v", got, IntValue{2})
+ }
+
+ got, _, err = cache.Get("metrics2")
+
+ if err == nil {
+ t.Errorf("Get() error = %v, wantErr %v", err, tt.wantErr)
+ }
+ if reflect.DeepEqual(got, tt.want) {
+ t.Errorf("Get() got = %v, want %v", got, IntValue{2})
+ }
+ })
+ }
+}
+
+func TestCache_IncreaseSize(t *testing.T) {
+ cache := New(256*1024, 256)
+ type args struct {
+ size int64
+ }
+ tests := []struct {
+ name string
+ args args
+ }{
+ {"IncreaseSize_Test",
+ args{
+ 10,
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ cache.IncreaseSize(tt.args.size)
+ })
+ }
+}
+
+func TestCache_Remove(t *testing.T) {
+ cache := New(256*1024, 256)
+ type args struct {
+ key string
+ }
+ tests := []struct {
+ name string
+ args args
+ wantErr bool
+ }{
+ {
+ name: "RemoveTest",
+ args: args{key: "metrics1"},
+ wantErr: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ cache.Set("metrics1", Values{IntValue{
+ value: 0,
+ }}, int64(1))
+ if _, err := cache.Remove(tt.args.key); (err != nil) != tt.wantErr {
+ t.Errorf("Remove() error = %v, wantErr %v", err, tt.wantErr)
+ }
+ })
+ }
+}
+
+func TestCache_Write(t *testing.T) {
+ cache := New(1024*256, 256)
+ if cache == nil {
+ t.Fail()
+ return
+ }
+ type args struct {
+ pairs map[string]Values
+ }
+ tests := []struct {
+ name string
+
+ args args
+ wantErr bool
+ }{
+ {
+ name: "WriteTest",
+ args: args{
+ pairs: map[string]Values{
+ "metricsInt": {
+ IntValue{
+ value: 0,
+ },
+ IntValue{
+ value: 10,
+ },
+ },
+
+ "metricsStr": {
+ StringValue{
+ value: "123123131",
+ },
+ StringValue{
+ value: "3213123",
+ },
+ StringValue{
+ value: "4121231",
+ },
+ },
+ "metricsFloat": {
+ FloatValue{
+ value: 3.1415,
+ },
+ FloatValue{
+ value: 3.32,
+ },
+ },
+ "metricsUint": {
+ UnsignedValue{
+ value: ^uint64(0),
+ },
+ UnsignedValue{
+ value: ^uint64(0) >> 1,
+ },
+ },
+ "metricsBool": {
+ BoolValue{
+ value: true,
+ },
+ BoolValue{
+ value: true,
+ },
+ },
+ "metricsIntSeria": {
+ IntValue{
+ value: 0,
+ }, IntValue{
+ value: 2,
+ },
+ },
+ "metricsStrSeria": {
+ StringValue{
+ value: "123123131",
+ },
+ },
+ "metricsFloatSeria": {
+ FloatValue{
+ value: 3.1415,
+ },
+ FloatValue{
+ value: 3.1414,
+ },
+ },
+ "metricsUintSeria": {
+ UnsignedValue{
+ value: ^uint64(0),
+ },
+ UnsignedValue{
+ value: 0,
+ },
+ },
+ "metricsBoolSeria": {
+ BoolValue{
+ value: true,
+ },
+ BoolValue{
+ value: true,
+ },
+ },
+ },
+ },
+ wantErr: false,
+ },
+ {
+ name: "UpdateTest",
+ args: args{
+ pairs: map[string]Values{
+ "metricsInt": {IntValue{
+ value: 1,
+ }},
+ "metricsStr": {
+ StringValue{
+ value: "31",
+ },
+ },
+ "metricsFloat": {
+ FloatValue{
+ value: 3.52414124124,
+ },
+ },
+ "metricsUint": {
+ UnsignedValue{
+ value: ^uint64(0) >> 1,
+ },
+ },
+ "metricsIntSeria": {
+ IntValue{
+ value: 200,
+ },
+ IntValue{
+ value: 200,
+ },
+ },
+ },
+ },
+ wantErr: false,
+ },
+ {
+ name: "WriteBigDataTest",
+ args: args{
+
+ pairs: map[string]Values{
+
+ "metricsStr": {
+ StringValue{
+ value: string(make([]byte, 1024*1024)),
+ },
+ },
+ },
+ },
+ wantErr: true,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ for k, v := range tt.args.pairs {
+ if err := cache.Set(k, v, time.Now().UnixNano()); (err != nil) != tt.wantErr {
+ t.Errorf("WriteMulti() error = %v, wantErr %v", err, tt.wantErr)
+ }
+ }
+
+ })
+
+ }
+}
+
+func BenchmarkLRU_Rand(b *testing.B) {
+ l := New(256*1024, 256)
+ l.log.SetLevel(logrus.ErrorLevel)
+ trace := make([]string, b.N*2)
+ for i := 0; i < b.N*2; i++ {
+ trace[i] = fmt.Sprintf("%d", rand.Int63()%32768)
+ }
+
+ b.ResetTimer()
+ v := Values{IntValue{1}}
+ var hit, miss int
+ for i := 0; i < 2*b.N; i++ {
+ if i%2 == 0 {
+ l.Set(trace[i], v, mathutil.MaxInt)
+ } else {
+ _, _, err := l.Get(trace[i])
+ if err != nil {
+ hit++
+ } else {
+ miss++
+ }
+ }
+ }
+ b.Logf("hit: %d miss: %d ratio: %f", hit, miss, float64(hit)/float64(miss))
+}
+
+func BenchmarkLRU_Freq1(b *testing.B) {
+ c := New(1024*1024, 256)
+ if c == nil {
+ b.Fail()
+ return
+ }
+ c.log.SetLevel(logrus.ErrorLevel)
+ trace := make([]string, b.N*2)
+ for i := 0; i < b.N*2; i++ {
+ if i%2 == 0 {
+ trace[i] = fmt.Sprintf("%d", rand.Int63()%16384)
+ } else {
+ trace[i] = fmt.Sprintf("%d", rand.Int63()%32768)
+ }
+ }
+
+ v := Values{IntValue{1}}
+ b.ResetTimer()
+
+ for i := 0; i < b.N; i++ {
+ c.Set(trace[i], v, mathutil.MaxInt)
+ }
+ var hit, miss int
+
+ for i := 0; i < b.N; i++ {
+ _, _, err := c.Get(trace[i])
+ if err != nil {
+ hit++
+ } else {
+ miss++
+ }
+ }
+ b.Logf("hit: %d miss: %d ratio: %f", hit, miss, float64(miss)/float64(hit))
+}
+
+func BenchmarkLRU_FreqParallel(b *testing.B) {
+ c := New(256*1024, 256)
+ if c == nil {
+ b.Fail()
+ return
+ }
+ c.log.SetLevel(logrus.ErrorLevel)
+ trace := make([]string, b.N*2)
+ v := Values{IntValue{1}}
+ for i := 0; i < b.N*2; i++ {
+ if i%2 == 0 {
+ trace[i] = fmt.Sprintf("%d", rand.Int63()%16384)
+ } else {
+ trace[i] = fmt.Sprintf("%d", rand.Int63()%32768)
+ }
+ }
+ b.ResetTimer()
+ b.RunParallel(func(pb *testing.PB) {
+ counter := 0
+ b.ReportAllocs()
+ for pb.Next() {
+ c.Set(trace[counter], v, int64(counter))
+ counter = counter + 1
+ if counter > b.N {
+ counter = 0
+ }
+ }
+ })
+}
+
+func TestLRU(t *testing.T) {
+
+ c := New(256*1024, 256)
+ if c == nil {
+ t.Fail()
+ return
+ }
+ for i := 0; i < 1024; i++ {
+ err := c.Set(fmt.Sprintf("%d", i), Values{IntValue{
+ value: int64(i),
+ }}, int64(i))
+ if err != nil {
+ t.Fatalf("cache insert error %v", err)
+ }
+ }
+ if c.Len() != 1024 {
+ t.Fatalf("bad len: %v", c.Len())
+ }
+
+ for i := 0; i < 128; i++ {
+ if v, _, ok := c.Get(fmt.Sprintf("%d", i+128)); ok != nil || int(v[0].(IntValue).value) != i+128 {
+ t.Fatalf("bad key: %v", i+128)
+ }
+ }
+ for i := 128; i < 256; i++ {
+ _, _, err := c.Get(fmt.Sprintf("%d", i))
+ if err != nil {
+ t.Fatalf("should not be evicted")
+ }
+ }
+ for i := 128; i < 192; i++ {
+ _, err := c.Remove(fmt.Sprintf("%d", i))
+ if err != nil {
+ t.Fatalf("should be deleted")
+ }
+ _, _, err = c.Get(fmt.Sprintf("%d", i))
+ if err == nil {
+ t.Fatalf("should be deleted")
+ }
+ }
+
+ _, _, err := c.Get(fmt.Sprintf("%d", 192))
+ if err != nil {
+ return
+ } // expect 192 to be last key in c.Keys()
+
+ if c.Len() != 960 {
+ t.Fatalf("bad len: %v", c.Len())
+ }
+ if _, _, ok := c.Get(fmt.Sprintf("%d", 960)); ok != nil {
+ t.Fatalf("should contain nothing")
+ }
+
+ for i := 0; i < 10240; i++ {
+
+ err := c.Set(fmt.Sprintf("%d", i), Values{IntValue{
+ value: int64(i),
+ }}, int64(i))
+ if err != nil {
+ t.Fatalf("cache insert error %v", err)
+ }
+ }
+ for i := 0; i < 10240; i++ {
+ v, _, ok := c.Get(fmt.Sprintf("%d", i))
+ if ok == nil && int(v[0].(IntValue).value) != i {
+ t.Fatalf("bad key: %v", i)
+ }
+ }
+}
diff --git a/modules/dop/endpoints/pipeline.go b/modules/dop/endpoints/pipeline.go
index 547177df053..cc7102b713b 100644
--- a/modules/dop/endpoints/pipeline.go
+++ b/modules/dop/endpoints/pipeline.go
@@ -644,7 +644,6 @@ func (e *Endpoints) checkrunCreate(ctx context.Context, r *http.Request, vars ma
if diceworkspace.IsRefPatternMatch(gitEvent.Content.TargetBranch, pipelineYml.Spec().On.Merge.Branches) {
exist = true
- break
}
if !exist {
diff --git a/modules/openapi/api/generate/generate_doc.go b/modules/openapi/api/generate/generate_doc.go
index 22d784b368f..8fe330590c3 100644
--- a/modules/openapi/api/generate/generate_doc.go
+++ b/modules/openapi/api/generate/generate_doc.go
@@ -24,7 +24,6 @@ import (
"github.com/erda-project/erda/pkg/swagger/oas3"
)
-
func generateDoc(onlyOpenapi bool, resultfile string) {
var (
apisM = make(map[string][]*apis.ApiSpec)
@@ -56,7 +55,7 @@ func generateDoc(onlyOpenapi bool, resultfile string) {
func writeSwagger(filename, title string, v3 *openapi3.Swagger) error {
filename = filepath.Base(filename)
- filename = strings.TrimSuffix(filename, filepath.Ext(filename))+".yml"
+ filename = strings.TrimSuffix(filename, filepath.Ext(filename)) + ".yml"
filename = title + "-" + filename
f, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0666)
diff --git a/modules/pipeline/services/pipelinesvc/passedDataWhenCreate.go b/modules/pipeline/services/pipelinesvc/passedDataWhenCreate.go
index 704847f001d..f58badc9acf 100644
--- a/modules/pipeline/services/pipelinesvc/passedDataWhenCreate.go
+++ b/modules/pipeline/services/pipelinesvc/passedDataWhenCreate.go
@@ -14,6 +14,8 @@
package pipelinesvc
import (
+ "sync"
+
"github.com/erda-project/erda/apistructs"
"github.com/erda-project/erda/modules/pipeline/services/apierrors"
"github.com/erda-project/erda/modules/pipeline/services/extmarketsvc"
@@ -27,6 +29,7 @@ type passedDataWhenCreate struct {
extMarketSvc *extmarketsvc.ExtMarketSvc
actionJobDefines map[string]*diceyml.Job
actionJobSpecs map[string]*apistructs.ActionSpec
+ lock sync.Mutex
}
func (that *passedDataWhenCreate) getActionJobDefine(actionTypeVersion string) *diceyml.Job {
@@ -61,6 +64,7 @@ func (that *passedDataWhenCreate) initData(extMarketSvc *extmarketsvc.ExtMarketS
that.actionJobSpecs = make(map[string]*apistructs.ActionSpec)
}
that.extMarketSvc = extMarketSvc
+ that.lock = sync.Mutex{}
}
func (that *passedDataWhenCreate) putPassedDataByPipelineYml(pipelineYml *pipelineyml.PipelineYml) error {
@@ -90,6 +94,10 @@ func (that *passedDataWhenCreate) putPassedDataByPipelineYml(pipelineYml *pipeli
if err != nil {
return apierrors.ErrCreatePipelineGraph.InternalError(err)
}
+
+ that.lock.Lock()
+ defer that.lock.Unlock()
+
for extItem, actionJobDefine := range actionJobDefines {
that.actionJobDefines[extItem] = actionJobDefine
}
diff --git a/modules/scheduler/executor/plugins/edas/edas.go b/modules/scheduler/executor/plugins/edas/edas.go
index 3416288fc96..eccc156bdee 100644
--- a/modules/scheduler/executor/plugins/edas/edas.go
+++ b/modules/scheduler/executor/plugins/edas/edas.go
@@ -76,7 +76,7 @@ var deleteOptions = &k8sapi.CascadingDeleteOptions{
// 'Foreground' - a cascading policy that deletes all dependents in the foreground
// e.g. if you delete a deployment, this option would delete related replicaSets and pods
// See more: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.10/#delete-24
- PropagationPolicy: "Foreground",
+ PropagationPolicy: string(metav1.DeletePropagationBackground),
}
// EDAS plugin's configure
@@ -1136,6 +1136,11 @@ func (e *EDAS) getAppID(name string) (string, error) {
return "", errors.Errorf("failed to list app, edasCode: %d, message: %s", resp.Code, resp.Message)
}
+ if len(resp.ApplicationList.Application) == 0 {
+ errMsg := fmt.Sprintf("[EDAS] application list count is 0")
+ logrus.Errorf(errMsg)
+ return "", fmt.Errorf(errMsg)
+ }
for _, app := range resp.ApplicationList.Application {
if name == app.Name {
logrus.Infof("[EDAS] Successfully to get app id: %s, name: %s", app.AppId, name)
diff --git a/modules/scheduler/executor/plugins/k8s/k8sapi/types.go b/modules/scheduler/executor/plugins/k8s/k8sapi/types.go
index dc332ecb7d2..e828640b480 100644
--- a/modules/scheduler/executor/plugins/k8s/k8sapi/types.go
+++ b/modules/scheduler/executor/plugins/k8s/k8sapi/types.go
@@ -16,6 +16,7 @@ package k8sapi
import (
v1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// NamespacePhase describes phase of a namespace
@@ -69,7 +70,7 @@ var DeleteOptions = &CascadingDeleteOptions{
// 'Foreground' - a cascading policy that deletes all dependents in the foreground
// e.g. if you delete a deployment, this option would delete related replicaSets and pods
// See more: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.10/#delete-24
- PropagationPolicy: "Foreground",
+ PropagationPolicy: string(metav1.DeletePropagationBackground),
}
// CascadingDeleteOptions describe the option of cascading deletion
diff --git a/pkg/database/sqllint/linters/allowed_stmt_linter_test.go b/pkg/database/sqllint/linters/allowed_stmt_linter_test.go
index bec1a747bb2..536ad9bc7dd 100644
--- a/pkg/database/sqllint/linters/allowed_stmt_linter_test.go
+++ b/pkg/database/sqllint/linters/allowed_stmt_linter_test.go
@@ -93,6 +93,11 @@ const (
allowedStmtLinter_SplitRegionStmt = ``
)
+const (
+ begin = "begin;"
+ commit = "commit;"
+)
+
func TestNewAllowedStmtLinter(t *testing.T) {
sqlsList := []string{
allowedStmtLinter_CreateDatabaseStmt,
@@ -184,4 +189,13 @@ func TestNewAllowedStmtLinter(t *testing.T) {
if errors := linterB.Errors(); len(errors["stmt [lints]"]) == 0 {
t.Fatal("fails")
}
+
+ if err := linterA.Input([]byte(begin), "begin"); err != nil {
+ t.Fatal(err)
+ }
+ if errors := linterA.Errors(); len(errors["begin [lints]"]) == 0 {
+ t.Fatal("fails")
+ }
+ data, _ := json.Marshal(linterA.Errors())
+ t.Log("report:", linterA.Report(), "errors:", string(data))
}
diff --git a/pkg/database/sqlparser/migrator/script.go b/pkg/database/sqlparser/migrator/script.go
index 6fa2f3e3ba3..25f86a4cd56 100644
--- a/pkg/database/sqlparser/migrator/script.go
+++ b/pkg/database/sqlparser/migrator/script.go
@@ -117,7 +117,7 @@ func NewScript(workdir, pathFromRepoRoot string) (*Script, error) {
if s.IsBaseline() {
continue
}
- return nil, errors.Errorf("only support DDL and DML, SQL: %s", node.Text())
+ return nil, errors.Errorf("only support DDL and DML, filename: %s, SQL: %s", s.GetName(), node.Text())
}
}
diff --git a/pkg/database/sqlparser/migrator/scripts_test.go b/pkg/database/sqlparser/migrator/scripts_test.go
index 30a6f443a59..662d9e5166f 100644
--- a/pkg/database/sqlparser/migrator/scripts_test.go
+++ b/pkg/database/sqlparser/migrator/scripts_test.go
@@ -90,6 +90,18 @@ func TestNewScripts(t *testing.T) {
}
}
+func TestNewScripts2(t *testing.T) {
+ var p = parameter{
+ workdir: "..",
+ migrationDir: "testdata/new_scripts_test_data2",
+ }
+
+ _, err := migrator.NewScripts(p)
+ if err == nil {
+ t.Fatal("fails")
+ }
+}
+
func TestScripts_AlterPermissionLint(t *testing.T) {
var p = parameter{
workdir: "..",
diff --git a/pkg/database/sqlparser/testdata/new_scripts_test_data2/service_a/20200101_01_first_feature.sql b/pkg/database/sqlparser/testdata/new_scripts_test_data2/service_a/20200101_01_first_feature.sql
new file mode 100644
index 00000000000..1cb347ae7fa
--- /dev/null
+++ b/pkg/database/sqlparser/testdata/new_scripts_test_data2/service_a/20200101_01_first_feature.sql
@@ -0,0 +1,2 @@
+begin ;
+commit ;
\ No newline at end of file
diff --git a/tools/cli/cmd/migrate_lint.go b/tools/cli/cmd/migrate_lint.go
index c52861635e3..372ddec4807 100644
--- a/tools/cli/cmd/migrate_lint.go
+++ b/tools/cli/cmd/migrate_lint.go
@@ -91,6 +91,12 @@ func RunMigrateLint(ctx *command.Context, input, config string, noDetail bool, o
}
func StandardMigrateLint(ctx *command.Context, input, config string) (err error) {
+ defer func() {
+ if err != nil {
+ log.Fatalln(err)
+ }
+ }()
+
var p = scriptsParameters{
migrationDir: input,
rules: nil,