diff --git a/src/autoscaler/collection/TSDCache.go b/src/autoscaler/collection/TSDCache.go new file mode 100644 index 000000000..b717595f6 --- /dev/null +++ b/src/autoscaler/collection/TSDCache.go @@ -0,0 +1,131 @@ +package collection + +import ( + "fmt" + "sync" +) + +type TSD interface { + GetTimestamp() int64 + HasLabels(map[string]string) bool +} + +type TSDCache struct { + lock *sync.RWMutex + data []TSD + capacity int + cursor int + num int +} + +func NewTSDCache(capacity int) *TSDCache { + if capacity <= 0 { + panic("invalid TSDCache capacity") + } + return &TSDCache{ + lock: &sync.RWMutex{}, + data: make([]TSD, capacity, capacity), + capacity: capacity, + cursor: 0, + } +} + +func (c *TSDCache) binarySearch(t int64) int { + if c.num == 0 { + return 0 + } + var l, r int + if c.data[c.cursor] == nil { + l = 0 + r = c.cursor - 1 + } else { + l = c.cursor + r = c.cursor - 1 + c.capacity + } + + for { + if l > r { + return l + } + m := l + (r-l)/2 + if t <= c.data[m%c.capacity].GetTimestamp() { + r = m - 1 + } else { + l = m + 1 + } + } +} + +func (c *TSDCache) Put(d TSD) { + c.lock.Lock() + defer c.lock.Unlock() + + defer func() { + c.num++ + }() + + if c.num == 0 || d.GetTimestamp() >= c.data[((c.cursor-1)+c.capacity)%c.capacity].GetTimestamp() { + c.data[c.cursor] = d + c.cursor = (c.cursor + 1) % c.capacity + return + } + + pos := c.binarySearch(d.GetTimestamp()) + if pos == c.cursor && c.data[c.cursor] != nil { + return + } + + end := c.cursor + if c.data[end] != nil { + end += c.capacity + } + for i := end; i > pos; i-- { + c.data[i%c.capacity] = c.data[(i-1)%c.capacity] + } + c.data[pos%c.capacity] = d + c.cursor = (c.cursor + 1) % c.capacity +} + +func (c *TSDCache) String() string { + c.lock.RLock() + defer c.lock.RUnlock() + + var head, tail int + if c.data[c.cursor] == nil { + head = 0 + tail = c.cursor - 1 + } else { + head = c.cursor + tail = c.cursor + c.capacity - 1 + } + + s := make([]TSD, tail-head+1) + for i := 0; i <= tail-head; i++ { + s[i] = c.data[(i+head)%c.capacity] + } + return fmt.Sprint(s) +} + +// +// Query returns the time series with the timestamp in [start, end) +// If it can not guarantee all the data are in cache, it returns (nil, false) +// +func (c *TSDCache) Query(start, end int64, labels map[string]string) ([]TSD, bool) { + c.lock.RLock() + defer c.lock.RUnlock() + + if c.num > c.capacity && c.data[c.cursor].GetTimestamp() >= start { + return nil, false + } + + from := c.binarySearch(start) + to := c.binarySearch(end) + result := []TSD{} + for i := from; i < to; i++ { + d := c.data[i%c.capacity] + if d.HasLabels(labels) { + result = append(result, d) + } + } + return result, true +} diff --git a/src/autoscaler/collection/TSDCache_test.go b/src/autoscaler/collection/TSDCache_test.go new file mode 100644 index 000000000..edf2a9950 --- /dev/null +++ b/src/autoscaler/collection/TSDCache_test.go @@ -0,0 +1,199 @@ +package collection_test + +import ( + "time" + + . "autoscaler/collection" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("TSDCache", func() { + var ( + cache *TSDCache + capacity int + err interface{} + labels map[string]string + ) + + JustBeforeEach(func() { + defer func() { + err = recover() + }() + cache = NewTSDCache(capacity) + }) + + Describe("NewTSDCache", func() { + Context("when creating TSDCache with invalid capacity", func() { + BeforeEach(func() { + capacity = -1 + }) + It("panics", func() { + Expect(err).To(Equal("invalid TSDCache capacity")) + }) + }) + Context("when creating TSDCache with valid capacity", func() { + BeforeEach(func() { + capacity = 10 + + }) + It("returns the TSDCache", func() { + Expect(err).To(BeNil()) + Expect(cache).NotTo(BeNil()) + }) + }) + }) + + Describe("Put", func() { + Context("when cache capacity is 1", func() { + BeforeEach(func() { + capacity = 1 + }) + It("only caches the latest data", func() { + cache.Put(TestTSD{10, nil}) + Expect(cache.String()).To(Equal("[{10 map[]}]")) + cache.Put(TestTSD{20, nil}) + Expect(cache.String()).To(Equal("[{20 map[]}]")) + cache.Put(TestTSD{15, nil}) + Expect(cache.String()).To(Equal("[{20 map[]}]")) + cache.Put(TestTSD{30, nil}) + Expect(cache.String()).To(Equal("[{30 map[]}]")) + }) + }) + Context("when data put to cache do not execeed the capacity", func() { + BeforeEach(func() { + capacity = 5 + }) + It("cache all data in ascending order", func() { + cache.Put(TestTSD{20, nil}) + cache.Put(TestTSD{10, nil}) + cache.Put(TestTSD{40, nil}) + cache.Put(TestTSD{50, nil}) + cache.Put(TestTSD{30, nil}) + Expect(cache.String()).To(Equal("[{10 map[]} {20 map[]} {30 map[]} {40 map[]} {50 map[]}]")) + }) + }) + Context("when data put to cache execeed the capacity", func() { + BeforeEach(func() { + capacity = 3 + }) + It("caches latest data in ascending order", func() { + cache.Put(TestTSD{20, nil}) + Expect(cache.String()).To(Equal("[{20 map[]}]")) + cache.Put(TestTSD{10, nil}) + Expect(cache.String()).To(Equal("[{10 map[]} {20 map[]}]")) + cache.Put(TestTSD{40, nil}) + Expect(cache.String()).To(Equal("[{10 map[]} {20 map[]} {40 map[]}]")) + cache.Put(TestTSD{50, nil}) + Expect(cache.String()).To(Equal("[{20 map[]} {40 map[]} {50 map[]}]")) + cache.Put(TestTSD{30, nil}) + Expect(cache.String()).To(Equal("[{30 map[]} {40 map[]} {50 map[]}]")) + cache.Put(TestTSD{50, nil}) + Expect(cache.String()).To(Equal("[{40 map[]} {50 map[]} {50 map[]}]")) + }) + }) + + }) + + Describe("Query", func() { + Context("when cache is empty", func() { + It("return empty results", func() { + result, ok := cache.Query(0, time.Now().UnixNano(), labels) + Expect(ok).To(BeTrue()) + Expect(result).To(BeEmpty()) + }) + }) + Context("when data put to cache do not execeeds the capacity", func() { + BeforeEach(func() { + capacity = 5 + }) + It("returns the data in [start, end)", func() { + cache.Put(TestTSD{20, nil}) + result, ok := cache.Query(10, 40, labels) + Expect(ok).To(BeTrue()) + Expect(result).To(Equal([]TSD{TestTSD{20, nil}})) + + cache.Put(TestTSD{10, nil}) + result, ok = cache.Query(10, 40, labels) + Expect(ok).To(BeTrue()) + Expect(result).To(Equal([]TSD{TestTSD{10, nil}, TestTSD{20, nil}})) + + cache.Put(TestTSD{40, nil}) + result, ok = cache.Query(10, 40, labels) + Expect(ok).To(BeTrue()) + Expect(result).To(Equal([]TSD{TestTSD{10, nil}, TestTSD{20, nil}})) + + cache.Put(TestTSD{30, nil}) + result, ok = cache.Query(10, 40, labels) + Expect(ok).To(BeTrue()) + Expect(result).To(Equal([]TSD{TestTSD{10, nil}, TestTSD{20, nil}, TestTSD{30, nil}})) + + cache.Put(TestTSD{50, nil}) + result, ok = cache.Query(10, 40, labels) + Expect(ok).To(BeTrue()) + Expect(result).To(Equal([]TSD{TestTSD{10, nil}, TestTSD{20, nil}, TestTSD{30, nil}})) + }) + }) + + Context("when data put to cache execeed the capacity", func() { + BeforeEach(func() { + capacity = 3 + }) + + Context("when all queried data are guarenteed in cache", func() { + It("returns the data in [start, end)", func() { + cache.Put(TestTSD{20, nil}) + cache.Put(TestTSD{10, nil}) + cache.Put(TestTSD{40, nil}) + cache.Put(TestTSD{30, nil}) + + result, ok := cache.Query(30, 50, labels) + Expect(ok).To(BeTrue()) + Expect(result).To(Equal([]TSD{TestTSD{30, nil}, TestTSD{40, nil}})) + + cache.Put(TestTSD{50, nil}) + result, ok = cache.Query(35, 50, labels) + Expect(ok).To(BeTrue()) + Expect(result).To(Equal([]TSD{TestTSD{40, nil}})) + }) + Context("when querying with labels", func() { + BeforeEach(func() { + capacity = 5 + }) + It("returns the data with all the labels", func() { + cache.Put(TestTSD{20, map[string]string{"tom": "cat", "pig": "pepper"}}) + cache.Put(TestTSD{10, nil}) + cache.Put(TestTSD{40, map[string]string{"jerry": "mouse", "tom": "cat", "peppa": "pig"}}) + cache.Put(TestTSD{30, map[string]string{"jerry": "mouse"}}) + cache.Put(TestTSD{50, nil}) + + result, ok := cache.Query(20, 60, map[string]string{"jerry": "mouse", "tom": "cat"}) + Expect(ok).To(BeTrue()) + Expect(result).To(Equal(([]TSD{TestTSD{40, map[string]string{"jerry": "mouse", "tom": "cat", "peppa": "pig"}}}))) + }) + + }) + + }) + Context("when queried data are possibly not in cache", func() { + It("returns false", func() { + cache.Put(TestTSD{20, nil}) + cache.Put(TestTSD{10, nil}) + cache.Put(TestTSD{40, nil}) + cache.Put(TestTSD{30, nil}) + + _, ok := cache.Query(10, 50, labels) + Expect(ok).To(BeFalse()) + + cache.Put(TestTSD{50, nil}) + _, ok = cache.Query(30, 50, labels) + Expect(ok).To(BeFalse()) + }) + + }) + + }) + + }) +}) diff --git a/src/autoscaler/collection/collection_suite_test.go b/src/autoscaler/collection/collection_suite_test.go new file mode 100644 index 000000000..f8d3c92aa --- /dev/null +++ b/src/autoscaler/collection/collection_suite_test.go @@ -0,0 +1,32 @@ +package collection_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "testing" +) + +func TestCollection(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Collection Suite") +} + +type TestTSD struct { + timestamp int64 + labels map[string]string +} + +func (t TestTSD) GetTimestamp() int64 { + return t.timestamp +} + +func (t TestTSD) HasLabels(labels map[string]string) bool { + for k, v := range labels { + if t.labels[k] == v { + continue + } + return false + } + return true +} diff --git a/src/autoscaler/metricscollector/cmd/metricscollector/main.go b/src/autoscaler/metricscollector/cmd/metricscollector/main.go index e6c956a21..2aa737ace 100644 --- a/src/autoscaler/metricscollector/cmd/metricscollector/main.go +++ b/src/autoscaler/metricscollector/cmd/metricscollector/main.go @@ -91,21 +91,22 @@ func main() { var createAppCollector func(string, chan *models.AppInstanceMetric) collector.AppCollector if conf.Collector.CollectMethod == config.CollectMethodPolling { createAppCollector = func(appId string, dataChan chan *models.AppInstanceMetric) collector.AppCollector { - return collector.NewAppPoller(logger.Session("app-poller"), appId, conf.Collector.CollectInterval, cfClient, noaa, mcClock, dataChan) + return collector.NewAppPoller(logger.Session("app-poller"), appId, conf.Collector.CollectInterval, conf.Collector.MetricCacheSizePerApp, cfClient, noaa, mcClock, dataChan) } } else { createAppCollector = func(appId string, dataChan chan *models.AppInstanceMetric) collector.AppCollector { noaaConsumer := consumer.New(dopplerUrl, tlsConfig, nil) noaaConsumer.RefreshTokenFrom(cfClient) - return collector.NewAppStreamer(logger.Session("app-streamer"), appId, conf.Collector.CollectInterval, cfClient, noaaConsumer, mcClock, dataChan) + return collector.NewAppStreamer(logger.Session("app-streamer"), appId, conf.Collector.CollectInterval, conf.Collector.MetricCacheSizePerApp, cfClient, noaaConsumer, mcClock, dataChan) } } + mc := collector.NewCollector(conf.Collector.RefreshInterval, conf.Collector.CollectInterval, conf.Collector.SaveInterval, + conf.Server.NodeIndex, len(conf.Server.NodeAddrs), logger.Session("collector"), + policyDB, instanceMetricsDB, mcClock, createAppCollector) + collectServer := ifrit.RunFunc(func(signals <-chan os.Signal, ready chan<- struct{}) error { logger.Info("starting collector", lager.Data{"NodeIndex": conf.Server.NodeIndex, "NodeAddrs": conf.Server.NodeAddrs}) - mc := collector.NewCollector(conf.Collector.RefreshInterval, conf.Collector.CollectInterval, conf.Collector.SaveInterval, - conf.Server.NodeIndex, len(conf.Server.NodeAddrs), logger.Session("collector"), - policyDB, instanceMetricsDB, mcClock, createAppCollector) mc.Start() close(ready) @@ -116,7 +117,7 @@ func main() { return nil }) - httpServer, err := server.NewServer(logger.Session("http_server"), conf, cfClient, noaa, instanceMetricsDB) + httpServer, err := server.NewServer(logger.Session("http_server"), conf, cfClient, noaa, mc.QueryMetrics, instanceMetricsDB) if err != nil { logger.Error("failed to create http server", err) os.Exit(1) diff --git a/src/autoscaler/metricscollector/cmd/metricscollector/metricscollector_suite_test.go b/src/autoscaler/metricscollector/cmd/metricscollector/metricscollector_suite_test.go index ffbfebd98..d3eb4ee06 100644 --- a/src/autoscaler/metricscollector/cmd/metricscollector/metricscollector_suite_test.go +++ b/src/autoscaler/metricscollector/cmd/metricscollector/metricscollector_suite_test.go @@ -157,6 +157,7 @@ var _ = SynchronizedBeforeSuite(func() []byte { cfg.Collector.RefreshInterval = 30 * time.Second cfg.Collector.CollectMethod = config.CollectMethodPolling cfg.Collector.SaveInterval = 5 * time.Second + cfg.Collector.MetricCacheSizePerApp = 100 configFile = writeConfig(&cfg) diff --git a/src/autoscaler/metricscollector/collector/app_streamer.go b/src/autoscaler/metricscollector/collector/app_streamer.go index bd04aa94f..88632e08c 100644 --- a/src/autoscaler/metricscollector/collector/app_streamer.go +++ b/src/autoscaler/metricscollector/collector/app_streamer.go @@ -2,6 +2,7 @@ package collector import ( "autoscaler/cf" + "autoscaler/collection" "autoscaler/metricscollector/noaa" "autoscaler/models" @@ -17,6 +18,7 @@ type appStreamer struct { appId string logger lager.Logger collectInterval time.Duration + cache *collection.TSDCache cfc cf.CFClient noaaConsumer noaa.NoaaConsumer doneChan chan bool @@ -27,11 +29,12 @@ type appStreamer struct { dataChan chan *models.AppInstanceMetric } -func NewAppStreamer(logger lager.Logger, appId string, interval time.Duration, cfc cf.CFClient, noaaConsumer noaa.NoaaConsumer, sclock clock.Clock, dataChan chan *models.AppInstanceMetric) AppCollector { +func NewAppStreamer(logger lager.Logger, appId string, interval time.Duration, cacheSize int, cfc cf.CFClient, noaaConsumer noaa.NoaaConsumer, sclock clock.Clock, dataChan chan *models.AppInstanceMetric) AppCollector { return &appStreamer{ appId: appId, logger: logger, collectInterval: interval, + cache: collection.NewTSDCache(cacheSize), cfc: cfc, noaaConsumer: noaaConsumer, doneChan: make(chan bool), @@ -97,11 +100,13 @@ func (as *appStreamer) processEvent(event *events.Envelope) { metric := noaa.GetInstanceMemoryUsedMetricFromContainerMetricEvent(as.sclock.Now().UnixNano(), as.appId, event) as.logger.Debug("process-event-get-memoryused-metric", lager.Data{"metric": metric}) if metric != nil { + as.cache.Put(metric) as.dataChan <- metric } metric = noaa.GetInstanceMemoryUtilMetricFromContainerMetricEvent(as.sclock.Now().UnixNano(), as.appId, event) as.logger.Debug("process-event-get-memoryutil-metric", lager.Data{"metric": metric}) if metric != nil { + as.cache.Put(metric) as.dataChan <- metric } @@ -128,6 +133,7 @@ func (as *appStreamer) computeAndSaveMetrics() { Timestamp: as.sclock.Now().UnixNano(), } as.logger.Debug("compute-throughput", lager.Data{"message": "write 0 throughput due to no requests"}) + as.cache.Put(throughput) as.dataChan <- throughput return } @@ -143,7 +149,7 @@ func (as *appStreamer) computeAndSaveMetrics() { Timestamp: as.sclock.Now().UnixNano(), } as.logger.Debug("compute-throughput", lager.Data{"throughput": throughput}) - + as.cache.Put(throughput) as.dataChan <- throughput responseTime := &models.AppInstanceMetric{ @@ -156,10 +162,14 @@ func (as *appStreamer) computeAndSaveMetrics() { Timestamp: as.sclock.Now().UnixNano(), } as.logger.Debug("compute-responsetime", lager.Data{"responsetime": responseTime}) - + as.cache.Put(responseTime) as.dataChan <- responseTime } as.numRequests = make(map[int32]int64) as.sumReponseTimes = make(map[int32]int64) } + +func (as *appStreamer) Query(start, end int64, labels map[string]string) ([]collection.TSD, bool) { + return as.cache.Query(start, end, labels) +} diff --git a/src/autoscaler/metricscollector/collector/app_streamer_test.go b/src/autoscaler/metricscollector/collector/app_streamer_test.go index 08971cdf4..4e477a7f5 100644 --- a/src/autoscaler/metricscollector/collector/app_streamer_test.go +++ b/src/autoscaler/metricscollector/collector/app_streamer_test.go @@ -2,6 +2,7 @@ package collector_test import ( "autoscaler/cf" + "autoscaler/collection" . "autoscaler/metricscollector/collector" "autoscaler/metricscollector/fakes" "autoscaler/metricscollector/noaa" @@ -29,6 +30,7 @@ var _ = Describe("AppStreamer", func() { errChan chan error fclock *fakeclock.FakeClock dataChan chan *models.AppInstanceMetric + cacheSize int ) BeforeEach(func() { @@ -39,8 +41,9 @@ var _ = Describe("AppStreamer", func() { buffer = logger.Buffer() fclock = fakeclock.NewFakeClock(time.Now()) dataChan = make(chan *models.AppInstanceMetric) + cacheSize = 100 - streamer = NewAppStreamer(logger, "an-app-id", TestCollectInterval, cfc, noaaConsumer, fclock, dataChan) + streamer = NewAppStreamer(logger, "an-app-id", TestCollectInterval, cacheSize, cfc, noaaConsumer, fclock, dataChan) msgChan = make(chan *events.Envelope) errChan = make(chan error, 1) @@ -72,50 +75,68 @@ var _ = Describe("AppStreamer", func() { msgChan <- noaa.NewContainerEnvelope(222222, "an-app-id", 1, 30.6, 200000000, 1000000000, 300000000, 2000000000) }() }) - It("sends container metrics to channel", func() { - Expect(<-dataChan).To(Equal(&models.AppInstanceMetric{ - AppId: "an-app-id", - InstanceIndex: 0, - CollectedAt: fclock.Now().UnixNano(), - Name: models.MetricNameMemoryUsed, - Unit: models.UnitMegaBytes, - Value: "95", - Timestamp: 111111, - })) - Expect(<-dataChan).To(Equal(&models.AppInstanceMetric{ - AppId: "an-app-id", - InstanceIndex: 0, - CollectedAt: fclock.Now().UnixNano(), - Name: models.MetricNameMemoryUtil, - Unit: models.UnitPercentage, - Value: "33", - Timestamp: 111111, - })) - Expect(<-dataChan).To(Equal(&models.AppInstanceMetric{ - AppId: "an-app-id", - InstanceIndex: 1, - CollectedAt: fclock.Now().UnixNano(), - Name: models.MetricNameMemoryUsed, - Unit: models.UnitMegaBytes, - Value: "191", - Timestamp: 222222, - })) - Expect(<-dataChan).To(Equal(&models.AppInstanceMetric{ - AppId: "an-app-id", - InstanceIndex: 1, - CollectedAt: fclock.Now().UnixNano(), - Name: models.MetricNameMemoryUtil, - Unit: models.UnitPercentage, - Value: "67", - Timestamp: 222222, - })) + It("sends container metrics to channel and cache", func() { + metrics := []*models.AppInstanceMetric{ + &models.AppInstanceMetric{ + AppId: "an-app-id", + InstanceIndex: 0, + CollectedAt: fclock.Now().UnixNano(), + Name: models.MetricNameMemoryUsed, + Unit: models.UnitMegaBytes, + Value: "95", + Timestamp: 111111, + }, + &models.AppInstanceMetric{ + AppId: "an-app-id", + InstanceIndex: 0, + CollectedAt: fclock.Now().UnixNano(), + Name: models.MetricNameMemoryUtil, + Unit: models.UnitPercentage, + Value: "33", + Timestamp: 111111, + }, + &models.AppInstanceMetric{ + AppId: "an-app-id", + InstanceIndex: 1, + CollectedAt: fclock.Now().UnixNano(), + Name: models.MetricNameMemoryUsed, + Unit: models.UnitMegaBytes, + Value: "191", + Timestamp: 222222, + }, + &models.AppInstanceMetric{ + AppId: "an-app-id", + InstanceIndex: 1, + CollectedAt: fclock.Now().UnixNano(), + Name: models.MetricNameMemoryUtil, + Unit: models.UnitPercentage, + Value: "67", + Timestamp: 222222, + }} + + Expect(<-dataChan).To(Equal(metrics[0])) + Expect(<-dataChan).To(Equal(metrics[1])) + Expect(<-dataChan).To(Equal(metrics[2])) + Expect(<-dataChan).To(Equal(metrics[3])) + + data, ok := streamer.Query(0, 333333, map[string]string{models.MetricLabelName: models.MetricNameMemoryUsed}) + Expect(ok).To(BeTrue()) + Expect(data).To(Equal([]collection.TSD{metrics[0], metrics[2]})) + + data, ok = streamer.Query(0, 333333, map[string]string{models.MetricLabelName: models.MetricNameMemoryUtil}) + Expect(ok).To(BeTrue()) + Expect(data).To(Equal([]collection.TSD{metrics[1], metrics[3]})) + + data, ok = streamer.Query(0, 333333, map[string]string{models.MetricLabelName: models.MetricNameThroughput}) + Expect(ok).To(BeTrue()) + Expect(data).To(BeEmpty()) By("collecting and computing throughput") Consistently(dataChan).ShouldNot(Receive()) By("sending throughput after the collect interval") fclock.WaitForWatcherAndIncrement(TestCollectInterval) - Expect(<-dataChan).To(Equal(&models.AppInstanceMetric{ + metric := &models.AppInstanceMetric{ AppId: "an-app-id", InstanceIndex: 0, CollectedAt: fclock.Now().UnixNano(), @@ -123,7 +144,13 @@ var _ = Describe("AppStreamer", func() { Unit: models.UnitRPS, Value: "0", Timestamp: fclock.Now().UnixNano(), - })) + } + Expect(<-dataChan).To(Equal(metric)) + + data, ok = streamer.Query(0, fclock.Now().UnixNano()+1, map[string]string{models.MetricLabelName: models.MetricNameThroughput}) + Expect(ok).To(BeTrue()) + Expect(data).To(Equal([]collection.TSD{metric})) + }) }) @@ -134,7 +161,7 @@ var _ = Describe("AppStreamer", func() { fclock.WaitForWatcherAndIncrement(TestCollectInterval) - Expect(<-dataChan).To(Equal(&models.AppInstanceMetric{ + metric1 := &models.AppInstanceMetric{ AppId: "an-app-id", InstanceIndex: 0, CollectedAt: fclock.Now().UnixNano(), @@ -142,8 +169,8 @@ var _ = Describe("AppStreamer", func() { Unit: models.UnitRPS, Value: "2", Timestamp: fclock.Now().UnixNano(), - })) - Expect(<-dataChan).To(Equal(&models.AppInstanceMetric{ + } + metric2 := &models.AppInstanceMetric{ AppId: "an-app-id", InstanceIndex: 0, CollectedAt: fclock.Now().UnixNano(), @@ -151,7 +178,14 @@ var _ = Describe("AppStreamer", func() { Unit: models.UnitMilliseconds, Value: "200", Timestamp: fclock.Now().UnixNano(), - })) + } + + Expect(<-dataChan).To(Equal(metric1)) + Expect(<-dataChan).To(Equal(metric2)) + + data, ok := streamer.Query(0, fclock.Now().UnixNano()+1, map[string]string{}) + Expect(ok).To(BeTrue()) + Expect(data).To(Equal([]collection.TSD{metric1, metric2})) msgChan <- noaa.NewHttpStartStopEnvelope(333333, 100000000, 300000000, 1) msgChan <- noaa.NewHttpStartStopEnvelope(555555, 300000000, 600000000, 1) @@ -160,7 +194,7 @@ var _ = Describe("AppStreamer", func() { fclock.WaitForWatcherAndIncrement(TestCollectInterval) - Expect(<-dataChan).To(Equal(&models.AppInstanceMetric{ + metric3 := &models.AppInstanceMetric{ AppId: "an-app-id", InstanceIndex: 1, CollectedAt: fclock.Now().UnixNano(), @@ -168,8 +202,8 @@ var _ = Describe("AppStreamer", func() { Unit: models.UnitRPS, Value: "3", Timestamp: fclock.Now().UnixNano(), - })) - Expect(<-dataChan).To(Equal(&models.AppInstanceMetric{ + } + metric4 := &models.AppInstanceMetric{ AppId: "an-app-id", InstanceIndex: 1, CollectedAt: fclock.Now().UnixNano(), @@ -177,7 +211,15 @@ var _ = Describe("AppStreamer", func() { Unit: models.UnitMilliseconds, Value: "300", Timestamp: fclock.Now().UnixNano(), - })) + } + + Expect(<-dataChan).To(Equal(metric3)) + Expect(<-dataChan).To(Equal(metric4)) + + data, ok = streamer.Query(0, fclock.Now().UnixNano()+1, map[string]string{}) + Expect(ok).To(BeTrue()) + Expect(data).To(Equal([]collection.TSD{metric1, metric2, metric3, metric4})) + }) Context("when the app has multiple instances", func() { @@ -196,6 +238,11 @@ var _ = Describe("AppStreamer", func() { Eventually(dataChan).Should(Receive()) Eventually(dataChan).Should(Receive()) Consistently(dataChan).ShouldNot(Receive()) + + data, ok := streamer.Query(0, fclock.Now().UnixNano()+1, map[string]string{}) + Expect(ok).To(BeTrue()) + Expect(data).To(HaveLen(6)) + }) }) }) @@ -211,7 +258,8 @@ var _ = Describe("AppStreamer", func() { By("sending throughput after the collect interval") fclock.WaitForWatcherAndIncrement(TestCollectInterval) - Expect(<-dataChan).To(Equal(&models.AppInstanceMetric{ + + metric := &models.AppInstanceMetric{ AppId: "an-app-id", InstanceIndex: 0, CollectedAt: fclock.Now().UnixNano(), @@ -219,7 +267,12 @@ var _ = Describe("AppStreamer", func() { Unit: models.UnitRPS, Value: "0", Timestamp: fclock.Now().UnixNano(), - })) + } + + Expect(<-dataChan).To(Equal(metric)) + data, ok := streamer.Query(0, fclock.Now().UnixNano()+1, map[string]string{}) + Expect(ok).To(BeTrue()) + Expect(data).To(Equal([]collection.TSD{metric})) }) }) Context("when there is error streaming events", func() { diff --git a/src/autoscaler/metricscollector/collector/apppoller.go b/src/autoscaler/metricscollector/collector/apppoller.go index ba2cba34f..f5adb21d7 100644 --- a/src/autoscaler/metricscollector/collector/apppoller.go +++ b/src/autoscaler/metricscollector/collector/apppoller.go @@ -2,6 +2,7 @@ package collector import ( "autoscaler/cf" + "autoscaler/collection" "autoscaler/metricscollector/noaa" "autoscaler/models" @@ -16,6 +17,7 @@ import ( type appPoller struct { appId string collectInterval time.Duration + cache *collection.TSDCache logger lager.Logger cfc cf.CFClient noaaConsumer noaa.NoaaConsumer @@ -24,10 +26,11 @@ type appPoller struct { dataChan chan *models.AppInstanceMetric } -func NewAppPoller(logger lager.Logger, appId string, collectInterval time.Duration, cfc cf.CFClient, noaaConsumer noaa.NoaaConsumer, pclock clock.Clock, dataChan chan *models.AppInstanceMetric) AppCollector { +func NewAppPoller(logger lager.Logger, appId string, collectInterval time.Duration, cacheSize int, cfc cf.CFClient, noaaConsumer noaa.NoaaConsumer, pclock clock.Clock, dataChan chan *models.AppInstanceMetric) AppCollector { return &appPoller{ appId: appId, collectInterval: collectInterval, + cache: collection.NewTSDCache(cacheSize), logger: logger, cfc: cfc, noaaConsumer: noaaConsumer, @@ -87,6 +90,11 @@ func (ap *appPoller) pollMetric() { logger.Debug("poll-metric-get-memory-metrics", lager.Data{"metrics": metrics}) for _, metric := range metrics { + ap.cache.Put(metric) ap.dataChan <- metric } } + +func (ap *appPoller) Query(start, end int64, labels map[string]string) ([]collection.TSD, bool) { + return ap.cache.Query(start, end, labels) +} diff --git a/src/autoscaler/metricscollector/collector/apppoller_test.go b/src/autoscaler/metricscollector/collector/apppoller_test.go index 3feb4cb0b..2ec796396 100644 --- a/src/autoscaler/metricscollector/collector/apppoller_test.go +++ b/src/autoscaler/metricscollector/collector/apppoller_test.go @@ -2,6 +2,7 @@ package collector_test import ( "autoscaler/cf" + "autoscaler/collection" . "autoscaler/metricscollector/collector" "autoscaler/metricscollector/fakes" "autoscaler/models" @@ -28,6 +29,7 @@ var _ = Describe("Apppoller", func() { buffer *gbytes.Buffer timestamp int64 dataChan chan *models.AppInstanceMetric + cacheSize int ) BeforeEach(func() { @@ -39,8 +41,9 @@ var _ = Describe("Apppoller", func() { fclock = fakeclock.NewFakeClock(time.Now()) dataChan = make(chan *models.AppInstanceMetric) + cacheSize = 100 - poller = NewAppPoller(logger, "test-app-id", TestCollectInterval, cfc, noaa, fclock, dataChan) + poller = NewAppPoller(logger, "test-app-id", TestCollectInterval, cacheSize, cfc, noaa, fclock, dataChan) timestamp = 111111 }) @@ -90,8 +93,8 @@ var _ = Describe("Apppoller", func() { } }) - It("sends the metrics to channel", func() { - Expect(<-dataChan).To(Equal(&models.AppInstanceMetric{ + It("sends the metrics to channel and cache", func() { + metric1 := &models.AppInstanceMetric{ AppId: "test-app-id", InstanceIndex: 0, CollectedAt: fclock.Now().UnixNano(), @@ -99,8 +102,9 @@ var _ = Describe("Apppoller", func() { Unit: models.UnitMegaBytes, Value: "95", Timestamp: 111111, - })) - Expect(<-dataChan).To(Equal(&models.AppInstanceMetric{ + } + + metric2 := &models.AppInstanceMetric{ AppId: "test-app-id", InstanceIndex: 0, CollectedAt: fclock.Now().UnixNano(), @@ -108,15 +112,28 @@ var _ = Describe("Apppoller", func() { Unit: models.UnitPercentage, Value: "33", Timestamp: 111111, - })) + } + + Expect(<-dataChan).To(Equal(metric1)) + Expect(<-dataChan).To(Equal(metric2)) + data, ok := poller.Query(0, fclock.Now().UnixNano()+1, map[string]string{}) + Expect(ok).To(BeTrue()) + Expect(data).To(Equal([]collection.TSD{metric1, metric2})) fclock.WaitForWatcherAndIncrement(TestCollectInterval) Eventually(dataChan).Should(Receive()) Eventually(dataChan).Should(Receive()) + data, ok = poller.Query(0, fclock.Now().UnixNano()+1, map[string]string{}) + Expect(ok).To(BeTrue()) + Expect(data).To(HaveLen(4)) fclock.WaitForWatcherAndIncrement(TestCollectInterval) Eventually(dataChan).Should(Receive()) Eventually(dataChan).Should(Receive()) + data, ok = poller.Query(0, fclock.Now().UnixNano()+1, map[string]string{}) + Expect(ok).To(BeTrue()) + Expect(data).To(HaveLen(6)) + }) }) @@ -130,7 +147,7 @@ var _ = Describe("Apppoller", func() { } }) - It("sends nothing to the channel", func() { + It("sends nothing to the channel and cache", func() { Consistently(dataChan).ShouldNot(Receive()) fclock.WaitForWatcherAndIncrement(TestCollectInterval) @@ -138,6 +155,10 @@ var _ = Describe("Apppoller", func() { fclock.WaitForWatcherAndIncrement(TestCollectInterval) Consistently(dataChan).ShouldNot(Receive()) + + data, ok := poller.Query(0, fclock.Now().UnixNano()+1, map[string]string{}) + Expect(ok).To(BeTrue()) + Expect(data).To(BeEmpty()) }) }) @@ -166,8 +187,8 @@ var _ = Describe("Apppoller", func() { } }) - It("sends metrics in non-empty container envelops to channel", func() { - Expect(<-dataChan).To(Equal(&models.AppInstanceMetric{ + It("sends metrics in non-empty container envelops to channel and cache", func() { + metric1 := &models.AppInstanceMetric{ AppId: "test-app-id", InstanceIndex: 0, CollectedAt: fclock.Now().UnixNano(), @@ -175,8 +196,8 @@ var _ = Describe("Apppoller", func() { Unit: models.UnitMegaBytes, Value: "95", Timestamp: 111111, - })) - Expect(<-dataChan).To(Equal(&models.AppInstanceMetric{ + } + metric2 := &models.AppInstanceMetric{ AppId: "test-app-id", InstanceIndex: 0, CollectedAt: fclock.Now().UnixNano(), @@ -184,14 +205,25 @@ var _ = Describe("Apppoller", func() { Unit: models.UnitPercentage, Value: "33", Timestamp: 111111, - })) + } + Expect(<-dataChan).To(Equal(metric1)) + Expect(<-dataChan).To(Equal(metric2)) + data, ok := poller.Query(0, fclock.Now().UnixNano()+1, map[string]string{}) + Expect(ok).To(BeTrue()) + Expect(data).To(Equal([]collection.TSD{metric1, metric2})) fclock.WaitForWatcherAndIncrement(TestCollectInterval) Consistently(dataChan).ShouldNot(Receive()) + data, ok = poller.Query(0, fclock.Now().UnixNano()+1, map[string]string{}) + Expect(ok).To(BeTrue()) + Expect(data).To(HaveLen(2)) fclock.WaitForWatcherAndIncrement(TestCollectInterval) Eventually(dataChan).Should(Receive()) Eventually(dataChan).Should(Receive()) + data, ok = poller.Query(0, fclock.Now().UnixNano()+1, map[string]string{}) + Expect(ok).To(BeTrue()) + Expect(data).To(HaveLen(4)) }) }) @@ -203,7 +235,7 @@ var _ = Describe("Apppoller", func() { noaa.ContainerEnvelopesReturns(nil, errors.New("test apppoller error")) }) - It("sends nothing to the channel and logs the errors", func() { + It("sends nothing to the channel/cache and logs the errors", func() { Eventually(buffer).Should(gbytes.Say("poll-metric-from-noaa")) Eventually(buffer).Should(gbytes.Say("test apppoller error")) Consistently(dataChan).ShouldNot(Receive()) @@ -212,6 +244,11 @@ var _ = Describe("Apppoller", func() { Eventually(buffer).Should(gbytes.Say("poll-metric-from-noaa")) Eventually(buffer).Should(gbytes.Say("test apppoller error")) Consistently(dataChan).ShouldNot(Receive()) + + data, ok := poller.Query(0, fclock.Now().UnixNano()+1, map[string]string{}) + Expect(ok).To(BeTrue()) + Expect(data).To(BeEmpty()) + }) }) @@ -243,14 +280,14 @@ var _ = Describe("Apppoller", func() { It("polls container envelopes successfully; logs the retries and errors", func() { Eventually(buffer).Should(gbytes.Say("poll-metric-from-noaa-retry")) - Eventually(buffer).Should(gbytes.Say("poll-metric-from-noaa-retry")) - Eventually(buffer).Should(gbytes.Say("poll-metric-get-memory-metric")) Eventually(noaa.ContainerEnvelopesCallCount).Should(Equal(3)) - Eventually(dataChan).Should(Receive()) Eventually(dataChan).Should(Receive()) + data, ok := poller.Query(0, fclock.Now().UnixNano()+1, map[string]string{}) + Expect(ok).To(BeTrue()) + Expect(data).To(HaveLen(2)) }) }) diff --git a/src/autoscaler/metricscollector/collector/collector.go b/src/autoscaler/metricscollector/collector/collector.go index bee780256..8aeaf4e90 100644 --- a/src/autoscaler/metricscollector/collector/collector.go +++ b/src/autoscaler/metricscollector/collector/collector.go @@ -1,6 +1,7 @@ package collector import ( + "autoscaler/collection" "autoscaler/db" "autoscaler/helpers" "autoscaler/models" @@ -11,9 +12,12 @@ import ( "code.cloudfoundry.org/lager" ) +type MetricQueryFunc func(string, int64, int64, db.OrderType, map[string]string) ([]*models.AppInstanceMetric, bool) + type AppCollector interface { Start() Stop() + Query(int64, int64, map[string]string) ([]collection.TSD, bool) } type Collector struct { @@ -31,7 +35,7 @@ type Collector struct { doneSaveChan chan bool appCollectors map[string]AppCollector ticker clock.Ticker - lock *sync.Mutex + lock *sync.RWMutex dataChan chan *models.AppInstanceMetric } @@ -52,7 +56,7 @@ func NewCollector(refreshInterval time.Duration, collectInterval time.Duration, doneChan: make(chan bool), doneSaveChan: make(chan bool), appCollectors: make(map[string]AppCollector), - lock: &sync.Mutex{}, + lock: &sync.RWMutex{}, dataChan: make(chan *models.AppInstanceMetric), } } @@ -121,25 +125,50 @@ func (c *Collector) Stop() { close(c.doneChan) close(c.doneSaveChan) - c.lock.Lock() + c.lock.RLock() for _, ap := range c.appCollectors { ap.Stop() } - c.lock.Unlock() + c.lock.RUnlock() } c.logger.Info("collector-stopped") } func (c *Collector) GetCollectorAppIds() []string { var appIds []string - c.lock.Lock() + c.lock.RLock() for id := range c.appCollectors { appIds = append(appIds, id) } - c.lock.Unlock() + c.lock.RUnlock() return appIds } +func (c *Collector) QueryMetrics(appID string, start, end int64, order db.OrderType, labels map[string]string) ([]*models.AppInstanceMetric, bool) { + c.lock.RLock() + appCollector, exist := c.appCollectors[appID] + c.lock.RUnlock() + if !exist { + return nil, false + } + + result, ok := appCollector.Query(start, end, labels) + if !ok { + return nil, false + } + metrics := make([]*models.AppInstanceMetric, len(result)) + if order == db.ASC { + for index, tsd := range result { + metrics[index] = tsd.(*models.AppInstanceMetric) + } + } else { + for index, tsd := range result { + metrics[len(result)-1-index] = tsd.(*models.AppInstanceMetric) + } + } + return metrics, true +} + func (c *Collector) SaveMetricsInDB() { ticker := c.cclock.NewTicker(c.saveInterval) metrics := make([]*models.AppInstanceMetric, 0) diff --git a/src/autoscaler/metricscollector/collector/collector_test.go b/src/autoscaler/metricscollector/collector/collector_test.go index 3c7cb3278..dfd60bf7b 100644 --- a/src/autoscaler/metricscollector/collector/collector_test.go +++ b/src/autoscaler/metricscollector/collector/collector_test.go @@ -1,6 +1,8 @@ package collector_test import ( + "autoscaler/collection" + "autoscaler/db" . "autoscaler/metricscollector/collector" "autoscaler/metricscollector/fakes" "autoscaler/models" @@ -29,6 +31,7 @@ var _ = Describe("Collector", func() { nodeNum int nodeIndex int createAppCollector func(string, chan *models.AppInstanceMetric) AppCollector + metric1, metric2 *models.AppInstanceMetric ) BeforeEach(func() { @@ -303,4 +306,66 @@ var _ = Describe("Collector", func() { }) }) + Describe("QueryMetrics", func() { + JustBeforeEach(func() { + coll = NewCollector(TestRefreshInterval, TestCollectInterval, TestSaveInterval, nodeIndex, nodeNum, logger, policyDb, instanceMetricsDb, fclock, createAppCollector) + coll.Start() + }) + BeforeEach(func() { + policyDb.GetAppIdsReturns(map[string]bool{"an-app-id": true}, nil) + metric1 = &models.AppInstanceMetric{ + AppId: "an-app-id", + InstanceIndex: 1, + CollectedAt: 2222, + Name: models.MetricNameThroughput, + Unit: models.UnitRPS, + Value: "3", + Timestamp: 2222, + } + metric2 = &models.AppInstanceMetric{ + AppId: "an-app-id", + InstanceIndex: 1, + CollectedAt: 3333, + Name: models.MetricNameResponseTime, + Unit: models.UnitMilliseconds, + Value: "300", + Timestamp: 3333, + } + appCollector.QueryReturns([]collection.TSD{metric1, metric2}, true) + }) + + Context("when cache hits", func() { + It("returns the data in cache", func() { + Eventually(policyDb.GetAppIdsCallCount).Should(Equal(1)) + + data, ok := coll.QueryMetrics("an-app-id", 0, 5555, db.ASC, map[string]string{}) + Expect(ok).To(BeTrue()) + Expect(data).To(Equal([]*models.AppInstanceMetric{metric1, metric2})) + + data, ok = coll.QueryMetrics("an-app-id", 0, 5555, db.DESC, map[string]string{}) + Expect(ok).To(BeTrue()) + Expect(data).To(Equal([]*models.AppInstanceMetric{metric2, metric1})) + }) + }) + Context("when cache misses", func() { + BeforeEach(func() { + appCollector.QueryReturns(nil, false) + }) + It("returns false", func() { + _, ok := coll.QueryMetrics("an-app-id", 0, 5555, db.ASC, map[string]string{}) + Expect(ok).To(BeFalse()) + }) + }) + Context("when app does not exist", func() { + It("returns false", func() { + _, ok := coll.QueryMetrics("non-exist-app-id", 0, 5555, db.ASC, map[string]string{}) + Expect(ok).To(BeFalse()) + }) + }) + + AfterEach(func() { + coll.Stop() + }) + }) + }) diff --git a/src/autoscaler/metricscollector/config/config.go b/src/autoscaler/metricscollector/config/config.go index c1cda62f6..eee66986f 100644 --- a/src/autoscaler/metricscollector/config/config.go +++ b/src/autoscaler/metricscollector/config/config.go @@ -16,10 +16,11 @@ import ( ) const ( - DefaultLoggingLevel = "info" - DefaultRefreshInterval time.Duration = 60 * time.Second - DefaultCollectInterval time.Duration = 30 * time.Second - DefaultSaveInterval time.Duration = 5 * time.Second + DefaultLoggingLevel = "info" + DefaultRefreshInterval time.Duration = 60 * time.Second + DefaultCollectInterval time.Duration = 30 * time.Second + DefaultSaveInterval time.Duration = 5 * time.Second + DefaultMetricCacheSizePerApp = 1000 CollectMethodPolling = "polling" CollectMethodStreaming = "streaming" @@ -51,17 +52,19 @@ type DBConfig struct { } type CollectorConfig struct { - RefreshInterval time.Duration `yaml:"refresh_interval"` - CollectInterval time.Duration `yaml:"collect_interval"` - CollectMethod string `yaml:"collect_method"` - SaveInterval time.Duration `yaml:"save_interval"` + RefreshInterval time.Duration `yaml:"refresh_interval"` + CollectInterval time.Duration `yaml:"collect_interval"` + CollectMethod string `yaml:"collect_method"` + SaveInterval time.Duration `yaml:"save_interval"` + MetricCacheSizePerApp int `yaml:"metric_cache_size_per_app"` } var defaultCollectorConfig = CollectorConfig{ - RefreshInterval: DefaultRefreshInterval, - CollectInterval: DefaultCollectInterval, - CollectMethod: CollectMethodStreaming, - SaveInterval: DefaultSaveInterval, + RefreshInterval: DefaultRefreshInterval, + CollectInterval: DefaultCollectInterval, + CollectMethod: CollectMethodStreaming, + SaveInterval: DefaultSaveInterval, + MetricCacheSizePerApp: DefaultMetricCacheSizePerApp, } type Config struct { @@ -98,6 +101,7 @@ func LoadConfig(reader io.Reader) (*Config, error) { } func (c *Config) Validate() error { + err := c.CF.Validate() if err != nil { return err @@ -127,6 +131,10 @@ func (c *Config) Validate() error { return fmt.Errorf("Configuration error: invalid collector.collect_method") } + if c.Collector.MetricCacheSizePerApp <= 0 { + return fmt.Errorf("Configuration error: invalid collector.metric_cache_size_per_app") + } + if (c.Server.NodeIndex >= len(c.Server.NodeAddrs)) || (c.Server.NodeIndex < 0) { return fmt.Errorf("Configuration error: server.node_index out of range") } diff --git a/src/autoscaler/metricscollector/config/config_test.go b/src/autoscaler/metricscollector/config/config_test.go index de22d1b9b..ffee102d8 100644 --- a/src/autoscaler/metricscollector/config/config_test.go +++ b/src/autoscaler/metricscollector/config/config_test.go @@ -81,6 +81,7 @@ collector: collect_interval: 10s collect_method: polling save_interval: 5s + metric_cache_size_per_app: 100 `) }) @@ -117,6 +118,7 @@ collector: Expect(conf.Collector.RefreshInterval).To(Equal(20 * time.Second)) Expect(conf.Collector.CollectInterval).To(Equal(10 * time.Second)) Expect(conf.Collector.CollectMethod).To(Equal(CollectMethodPolling)) + Expect(conf.Collector.MetricCacheSizePerApp).To(Equal(100)) Expect(conf.Server.TLS.KeyFile).To(Equal("/var/vcap/jobs/autoscaler/config/certs/server.key")) Expect(conf.Server.TLS.CertFile).To(Equal("/var/vcap/jobs/autoscaler/config/certs/server.crt")) @@ -163,6 +165,7 @@ db: Expect(conf.Collector.RefreshInterval).To(Equal(DefaultRefreshInterval)) Expect(conf.Collector.CollectInterval).To(Equal(DefaultCollectInterval)) Expect(conf.Collector.CollectMethod).To(Equal(CollectMethodStreaming)) + Expect(conf.Collector.MetricCacheSizePerApp).To(Equal(DefaultMetricCacheSizePerApp)) }) }) @@ -645,6 +648,7 @@ collector: conf.Collector.RefreshInterval = time.Duration(60 * time.Second) conf.Collector.CollectMethod = CollectMethodPolling conf.Collector.SaveInterval = time.Duration(5 * time.Second) + conf.Collector.MetricCacheSizePerApp = 100 conf.Server.NodeAddrs = []string{"address1", "address2"} conf.Server.NodeIndex = 0 }) @@ -729,6 +733,15 @@ collector: }) }) + Context("when metrics cache size per app is invalid", func() { + BeforeEach(func() { + conf.Collector.MetricCacheSizePerApp = 0 + }) + It("should error", func() { + Expect(err).To(MatchError("Configuration error: invalid collector.metric_cache_size_per_app")) + }) + }) + Context("when node index is out of range", func() { Context("when node index is negative", func() { BeforeEach(func() { diff --git a/src/autoscaler/metricscollector/fakes/fake_app_collector.go b/src/autoscaler/metricscollector/fakes/fake_app_collector.go index c9f730b41..7ea249ea2 100644 --- a/src/autoscaler/metricscollector/fakes/fake_app_collector.go +++ b/src/autoscaler/metricscollector/fakes/fake_app_collector.go @@ -2,6 +2,7 @@ package fakes import ( + "autoscaler/collection" "autoscaler/metricscollector/collector" "sync" ) @@ -13,6 +14,21 @@ type FakeAppCollector struct { StopStub func() stopMutex sync.RWMutex stopArgsForCall []struct{} + QueryStub func(int64, int64, map[string]string) ([]collection.TSD, bool) + queryMutex sync.RWMutex + queryArgsForCall []struct { + arg1 int64 + arg2 int64 + arg3 map[string]string + } + queryReturns struct { + result1 []collection.TSD + result2 bool + } + queryReturnsOnCall map[int]struct { + result1 []collection.TSD + result2 bool + } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } @@ -49,6 +65,59 @@ func (fake *FakeAppCollector) StopCallCount() int { return len(fake.stopArgsForCall) } +func (fake *FakeAppCollector) Query(arg1 int64, arg2 int64, arg3 map[string]string) ([]collection.TSD, bool) { + fake.queryMutex.Lock() + ret, specificReturn := fake.queryReturnsOnCall[len(fake.queryArgsForCall)] + fake.queryArgsForCall = append(fake.queryArgsForCall, struct { + arg1 int64 + arg2 int64 + arg3 map[string]string + }{arg1, arg2, arg3}) + fake.recordInvocation("Query", []interface{}{arg1, arg2, arg3}) + fake.queryMutex.Unlock() + if fake.QueryStub != nil { + return fake.QueryStub(arg1, arg2, arg3) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fake.queryReturns.result1, fake.queryReturns.result2 +} + +func (fake *FakeAppCollector) QueryCallCount() int { + fake.queryMutex.RLock() + defer fake.queryMutex.RUnlock() + return len(fake.queryArgsForCall) +} + +func (fake *FakeAppCollector) QueryArgsForCall(i int) (int64, int64, map[string]string) { + fake.queryMutex.RLock() + defer fake.queryMutex.RUnlock() + return fake.queryArgsForCall[i].arg1, fake.queryArgsForCall[i].arg2, fake.queryArgsForCall[i].arg3 +} + +func (fake *FakeAppCollector) QueryReturns(result1 []collection.TSD, result2 bool) { + fake.QueryStub = nil + fake.queryReturns = struct { + result1 []collection.TSD + result2 bool + }{result1, result2} +} + +func (fake *FakeAppCollector) QueryReturnsOnCall(i int, result1 []collection.TSD, result2 bool) { + fake.QueryStub = nil + if fake.queryReturnsOnCall == nil { + fake.queryReturnsOnCall = make(map[int]struct { + result1 []collection.TSD + result2 bool + }) + } + fake.queryReturnsOnCall[i] = struct { + result1 []collection.TSD + result2 bool + }{result1, result2} +} + func (fake *FakeAppCollector) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() @@ -56,6 +125,8 @@ func (fake *FakeAppCollector) Invocations() map[string][][]interface{} { defer fake.startMutex.RUnlock() fake.stopMutex.RLock() defer fake.stopMutex.RUnlock() + fake.queryMutex.RLock() + defer fake.queryMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/src/autoscaler/metricscollector/fakes/fake_instancemetrics_db.go b/src/autoscaler/metricscollector/fakes/fake_instancemetrics_db.go index 0fc308013..aa8846669 100644 --- a/src/autoscaler/metricscollector/fakes/fake_instancemetrics_db.go +++ b/src/autoscaler/metricscollector/fakes/fake_instancemetrics_db.go @@ -1,4 +1,4 @@ -// This file was generated by counterfeiter +// Code generated by counterfeiter. DO NOT EDIT. package fakes import ( @@ -22,6 +22,10 @@ type FakeInstanceMetricsDB struct { result1 []*models.AppInstanceMetric result2 error } + retrieveInstanceMetricsReturnsOnCall map[int]struct { + result1 []*models.AppInstanceMetric + result2 error + } SaveMetricStub func(metric *models.AppInstanceMetric) error saveMetricMutex sync.RWMutex saveMetricArgsForCall []struct { @@ -30,6 +34,9 @@ type FakeInstanceMetricsDB struct { saveMetricReturns struct { result1 error } + saveMetricReturnsOnCall map[int]struct { + result1 error + } SaveMetricsInBulkStub func(metrics []*models.AppInstanceMetric) error saveMetricsInBulkMutex sync.RWMutex saveMetricsInBulkArgsForCall []struct { @@ -38,6 +45,9 @@ type FakeInstanceMetricsDB struct { saveMetricsInBulkReturns struct { result1 error } + saveMetricsInBulkReturnsOnCall map[int]struct { + result1 error + } PruneInstanceMetricsStub func(before int64) error pruneInstanceMetricsMutex sync.RWMutex pruneInstanceMetricsArgsForCall []struct { @@ -46,18 +56,25 @@ type FakeInstanceMetricsDB struct { pruneInstanceMetricsReturns struct { result1 error } + pruneInstanceMetricsReturnsOnCall map[int]struct { + result1 error + } CloseStub func() error closeMutex sync.RWMutex closeArgsForCall []struct{} closeReturns struct { result1 error } + closeReturnsOnCall map[int]struct { + result1 error + } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } func (fake *FakeInstanceMetricsDB) RetrieveInstanceMetrics(appid string, instanceIndex int, name string, start int64, end int64, orderType db.OrderType) ([]*models.AppInstanceMetric, error) { fake.retrieveInstanceMetricsMutex.Lock() + ret, specificReturn := fake.retrieveInstanceMetricsReturnsOnCall[len(fake.retrieveInstanceMetricsArgsForCall)] fake.retrieveInstanceMetricsArgsForCall = append(fake.retrieveInstanceMetricsArgsForCall, struct { appid string instanceIndex int @@ -71,6 +88,9 @@ func (fake *FakeInstanceMetricsDB) RetrieveInstanceMetrics(appid string, instanc if fake.RetrieveInstanceMetricsStub != nil { return fake.RetrieveInstanceMetricsStub(appid, instanceIndex, name, start, end, orderType) } + if specificReturn { + return ret.result1, ret.result2 + } return fake.retrieveInstanceMetricsReturns.result1, fake.retrieveInstanceMetricsReturns.result2 } @@ -94,8 +114,23 @@ func (fake *FakeInstanceMetricsDB) RetrieveInstanceMetricsReturns(result1 []*mod }{result1, result2} } +func (fake *FakeInstanceMetricsDB) RetrieveInstanceMetricsReturnsOnCall(i int, result1 []*models.AppInstanceMetric, result2 error) { + fake.RetrieveInstanceMetricsStub = nil + if fake.retrieveInstanceMetricsReturnsOnCall == nil { + fake.retrieveInstanceMetricsReturnsOnCall = make(map[int]struct { + result1 []*models.AppInstanceMetric + result2 error + }) + } + fake.retrieveInstanceMetricsReturnsOnCall[i] = struct { + result1 []*models.AppInstanceMetric + result2 error + }{result1, result2} +} + func (fake *FakeInstanceMetricsDB) SaveMetric(metric *models.AppInstanceMetric) error { fake.saveMetricMutex.Lock() + ret, specificReturn := fake.saveMetricReturnsOnCall[len(fake.saveMetricArgsForCall)] fake.saveMetricArgsForCall = append(fake.saveMetricArgsForCall, struct { metric *models.AppInstanceMetric }{metric}) @@ -104,6 +139,9 @@ func (fake *FakeInstanceMetricsDB) SaveMetric(metric *models.AppInstanceMetric) if fake.SaveMetricStub != nil { return fake.SaveMetricStub(metric) } + if specificReturn { + return ret.result1 + } return fake.saveMetricReturns.result1 } @@ -126,6 +164,18 @@ func (fake *FakeInstanceMetricsDB) SaveMetricReturns(result1 error) { }{result1} } +func (fake *FakeInstanceMetricsDB) SaveMetricReturnsOnCall(i int, result1 error) { + fake.SaveMetricStub = nil + if fake.saveMetricReturnsOnCall == nil { + fake.saveMetricReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.saveMetricReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeInstanceMetricsDB) SaveMetricsInBulk(metrics []*models.AppInstanceMetric) error { var metricsCopy []*models.AppInstanceMetric if metrics != nil { @@ -133,6 +183,7 @@ func (fake *FakeInstanceMetricsDB) SaveMetricsInBulk(metrics []*models.AppInstan copy(metricsCopy, metrics) } fake.saveMetricsInBulkMutex.Lock() + ret, specificReturn := fake.saveMetricsInBulkReturnsOnCall[len(fake.saveMetricsInBulkArgsForCall)] fake.saveMetricsInBulkArgsForCall = append(fake.saveMetricsInBulkArgsForCall, struct { metrics []*models.AppInstanceMetric }{metricsCopy}) @@ -141,6 +192,9 @@ func (fake *FakeInstanceMetricsDB) SaveMetricsInBulk(metrics []*models.AppInstan if fake.SaveMetricsInBulkStub != nil { return fake.SaveMetricsInBulkStub(metrics) } + if specificReturn { + return ret.result1 + } return fake.saveMetricsInBulkReturns.result1 } @@ -163,8 +217,21 @@ func (fake *FakeInstanceMetricsDB) SaveMetricsInBulkReturns(result1 error) { }{result1} } +func (fake *FakeInstanceMetricsDB) SaveMetricsInBulkReturnsOnCall(i int, result1 error) { + fake.SaveMetricsInBulkStub = nil + if fake.saveMetricsInBulkReturnsOnCall == nil { + fake.saveMetricsInBulkReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.saveMetricsInBulkReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeInstanceMetricsDB) PruneInstanceMetrics(before int64) error { fake.pruneInstanceMetricsMutex.Lock() + ret, specificReturn := fake.pruneInstanceMetricsReturnsOnCall[len(fake.pruneInstanceMetricsArgsForCall)] fake.pruneInstanceMetricsArgsForCall = append(fake.pruneInstanceMetricsArgsForCall, struct { before int64 }{before}) @@ -173,6 +240,9 @@ func (fake *FakeInstanceMetricsDB) PruneInstanceMetrics(before int64) error { if fake.PruneInstanceMetricsStub != nil { return fake.PruneInstanceMetricsStub(before) } + if specificReturn { + return ret.result1 + } return fake.pruneInstanceMetricsReturns.result1 } @@ -195,14 +265,30 @@ func (fake *FakeInstanceMetricsDB) PruneInstanceMetricsReturns(result1 error) { }{result1} } +func (fake *FakeInstanceMetricsDB) PruneInstanceMetricsReturnsOnCall(i int, result1 error) { + fake.PruneInstanceMetricsStub = nil + if fake.pruneInstanceMetricsReturnsOnCall == nil { + fake.pruneInstanceMetricsReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.pruneInstanceMetricsReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeInstanceMetricsDB) Close() error { fake.closeMutex.Lock() + ret, specificReturn := fake.closeReturnsOnCall[len(fake.closeArgsForCall)] fake.closeArgsForCall = append(fake.closeArgsForCall, struct{}{}) fake.recordInvocation("Close", []interface{}{}) fake.closeMutex.Unlock() if fake.CloseStub != nil { return fake.CloseStub() } + if specificReturn { + return ret.result1 + } return fake.closeReturns.result1 } @@ -219,6 +305,18 @@ func (fake *FakeInstanceMetricsDB) CloseReturns(result1 error) { }{result1} } +func (fake *FakeInstanceMetricsDB) CloseReturnsOnCall(i int, result1 error) { + fake.CloseStub = nil + if fake.closeReturnsOnCall == nil { + fake.closeReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.closeReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeInstanceMetricsDB) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() @@ -232,7 +330,11 @@ func (fake *FakeInstanceMetricsDB) Invocations() map[string][][]interface{} { defer fake.pruneInstanceMetricsMutex.RUnlock() fake.closeMutex.RLock() defer fake.closeMutex.RUnlock() - return fake.invocations + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations } func (fake *FakeInstanceMetricsDB) recordInvocation(key string, args []interface{}) { diff --git a/src/autoscaler/metricscollector/server/metric_handler.go b/src/autoscaler/metricscollector/server/metric_handler.go index f6875b4ea..08081a744 100644 --- a/src/autoscaler/metricscollector/server/metric_handler.go +++ b/src/autoscaler/metricscollector/server/metric_handler.go @@ -3,6 +3,7 @@ package server import ( "autoscaler/cf" "autoscaler/db" + "autoscaler/metricscollector/collector" "autoscaler/metricscollector/noaa" "autoscaler/models" @@ -20,14 +21,16 @@ type MetricHandler struct { cfClient cf.CFClient logger lager.Logger noaaConsumer noaa.NoaaConsumer + queryFunc collector.MetricQueryFunc database db.InstanceMetricsDB } -func NewMetricHandler(logger lager.Logger, cfc cf.CFClient, consumer noaa.NoaaConsumer, database db.InstanceMetricsDB) *MetricHandler { +func NewMetricHandler(logger lager.Logger, cfc cf.CFClient, consumer noaa.NoaaConsumer, query collector.MetricQueryFunc, database db.InstanceMetricsDB) *MetricHandler { return &MetricHandler{ cfClient: cfc, - noaaConsumer: consumer, logger: logger, + noaaConsumer: consumer, + queryFunc: query, database: database, } } @@ -128,15 +131,21 @@ func (h *MetricHandler) GetMetricHistories(w http.ResponseWriter, r *http.Reques return } - var mtrcs []*models.AppInstanceMetric - - mtrcs, err = h.database.RetrieveInstanceMetrics(appId, int(instanceIndex), metricType, start, end, order) - if err != nil { - h.logger.Error("get-metric-histories-retrieve-metrics", err, lager.Data{"appId": appId, "metrictype": metricType, "instanceIndex": instanceIndex, "start": start, "end": end, "order": order}) - handlers.WriteJSONResponse(w, http.StatusInternalServerError, models.ErrorResponse{ - Code: "Interal-Server-Error", - Message: "Error getting metric histories from database"}) - return + labels := map[string]string{models.MetricLabelName: metricType} + if instanceIndex >= 0 { + labels[models.MetricLabelInstanceIndex] = fmt.Sprintf("%d", instanceIndex) + } + mtrcs, ok := h.queryFunc(appId, start, end+1, order, labels) + if !ok { + h.logger.Debug("get-metric-histories-query-cache-miss", lager.Data{"appId": appId, "metrictype": metricType, "instanceIndex": instanceIndex, "start": start, "end": end, "order": order}) + mtrcs, err = h.database.RetrieveInstanceMetrics(appId, int(instanceIndex), metricType, start, end, order) + if err != nil { + h.logger.Error("get-metric-histories-retrieve-metrics", err, lager.Data{"appId": appId, "metrictype": metricType, "instanceIndex": instanceIndex, "start": start, "end": end, "order": order}) + handlers.WriteJSONResponse(w, http.StatusInternalServerError, models.ErrorResponse{ + Code: "Interal-Server-Error", + Message: "Error getting metric histories from database"}) + return + } } var body []byte diff --git a/src/autoscaler/metricscollector/server/metric_handler_test.go b/src/autoscaler/metricscollector/server/metric_handler_test.go index de3793695..b177f9236 100644 --- a/src/autoscaler/metricscollector/server/metric_handler_test.go +++ b/src/autoscaler/metricscollector/server/metric_handler_test.go @@ -31,8 +31,10 @@ var _ = Describe("MetricHandler", func() { req *http.Request err error - metric1 models.AppInstanceMetric - metric2 models.AppInstanceMetric + metric1 models.AppInstanceMetric + metric2 models.AppInstanceMetric + metrics []*models.AppInstanceMetric + cacheHit bool ) BeforeEach(func() { @@ -41,7 +43,11 @@ var _ = Describe("MetricHandler", func() { logger := lager.NewLogger("handler-test") database = &fakes.FakeInstanceMetricsDB{} resp = httptest.NewRecorder() - handler = NewMetricHandler(logger, cfc, consumer, database) + cacheHit = false + queryFunc := func(appID string, start int64, end int64, order db.OrderType, labels map[string]string) ([]*models.AppInstanceMetric, bool) { + return metrics, cacheHit + } + handler = NewMetricHandler(logger, cfc, consumer, queryFunc, database) }) Describe("GetMetricHistory", func() { @@ -249,7 +255,7 @@ var _ = Describe("MetricHandler", func() { Expect(err).ToNot(HaveOccurred()) }) - It("queries metrics from database with the given start, end and order ", func() { + It("queries metrics with the given start, end and order ", func() { appid, instanceIndex, name, start, end, order := database.RetrieveInstanceMetricsArgsForCall(0) Expect(instanceIndex).To(Equal(0)) Expect(appid).To(Equal("an-app-id")) @@ -280,7 +286,7 @@ var _ = Describe("MetricHandler", func() { Expect(err).ToNot(HaveOccurred()) }) - It("queries metrics from database with start time 0", func() { + It("queries metrics with start time 0", func() { _, _, _, start, _, _ := database.RetrieveInstanceMetricsArgsForCall(0) Expect(start).To(Equal(int64(0))) }) @@ -293,7 +299,7 @@ var _ = Describe("MetricHandler", func() { Expect(err).ToNot(HaveOccurred()) }) - It("queries metrics from database with end time -1 ", func() { + It("queries metrics with end time -1 ", func() { _, _, _, _, end, _ := database.RetrieveInstanceMetricsArgsForCall(0) Expect(end).To(Equal(int64(-1))) }) @@ -306,14 +312,14 @@ var _ = Describe("MetricHandler", func() { Expect(err).ToNot(HaveOccurred()) }) - It("queries metrics from database with end time -1 ", func() { + It("queries metrics with end time -1 ", func() { _, _, _, _, _, order := database.RetrieveInstanceMetricsArgsForCall(0) Expect(order).To(Equal(db.ASC)) }) }) - Context("when query database succeeds", func() { + Context("when query succeeds", func() { BeforeEach(func() { req, err = http.NewRequest(http.MethodGet, testUrlMetricHistories+"?instanceindex=0&start=123&end=567&order=desc", nil) Expect(err).ToNot(HaveOccurred()) @@ -321,33 +327,54 @@ var _ = Describe("MetricHandler", func() { metric1 = models.AppInstanceMetric{ AppId: "an-app-id", InstanceIndex: 0, - CollectedAt: 111122, + CollectedAt: 111, Name: "a-metric-type", Unit: "metric-unit", Value: "12345678", - Timestamp: 111100, + Timestamp: 345, } metric2 = models.AppInstanceMetric{ AppId: "an-app-id", InstanceIndex: 0, - CollectedAt: 111122, + CollectedAt: 222, Name: "a-metric-type", Unit: "metric-unit", Value: "87654321", - Timestamp: 111111, + Timestamp: 456, } - database.RetrieveInstanceMetricsReturns([]*models.AppInstanceMetric{&metric2, &metric1}, nil) + }) - It("returns 200 with metrics in message body", func() { - Expect(resp.Code).To(Equal(http.StatusOK)) + Context("when cache hits", func() { + BeforeEach(func() { + cacheHit = true + metrics = []*models.AppInstanceMetric{&metric1, &metric2} + }) - mtrcs := &[]models.AppInstanceMetric{} - err = json.Unmarshal(resp.Body.Bytes(), mtrcs) + It("returns 200 with metrics from cache in message body", func() { + Expect(resp.Code).To(Equal(http.StatusOK)) + result := &[]models.AppInstanceMetric{} + err = json.Unmarshal(resp.Body.Bytes(), result) - Expect(err).ToNot(HaveOccurred()) - Expect(*mtrcs).To(Equal([]models.AppInstanceMetric{metric2, metric1})) + Expect(err).ToNot(HaveOccurred()) + Expect(*result).To(Equal([]models.AppInstanceMetric{metric1, metric2})) + }) + + }) + Context("when cache misses", func() { + BeforeEach(func() { + database.RetrieveInstanceMetricsReturns([]*models.AppInstanceMetric{&metric2, &metric1}, nil) + }) + It("returns 200 with metrics from database in message body", func() { + Expect(resp.Code).To(Equal(http.StatusOK)) + + result := &[]models.AppInstanceMetric{} + err = json.Unmarshal(resp.Body.Bytes(), result) + + Expect(err).ToNot(HaveOccurred()) + Expect(*result).To(Equal([]models.AppInstanceMetric{metric2, metric1})) + }) }) }) diff --git a/src/autoscaler/metricscollector/server/server.go b/src/autoscaler/metricscollector/server/server.go index 00974a3a1..81e75d348 100644 --- a/src/autoscaler/metricscollector/server/server.go +++ b/src/autoscaler/metricscollector/server/server.go @@ -6,6 +6,7 @@ import ( "autoscaler/cf" "autoscaler/db" + "autoscaler/metricscollector/collector" "autoscaler/metricscollector/config" "autoscaler/metricscollector/noaa" "autoscaler/routes" @@ -24,8 +25,8 @@ func (vh VarsFunc) ServeHTTP(w http.ResponseWriter, r *http.Request) { vh(w, r, vars) } -func NewServer(logger lager.Logger, conf *config.Config, cfc cf.CFClient, consumer noaa.NoaaConsumer, database db.InstanceMetricsDB) (ifrit.Runner, error) { - mh := NewMetricHandler(logger, cfc, consumer, database) +func NewServer(logger lager.Logger, conf *config.Config, cfc cf.CFClient, consumer noaa.NoaaConsumer, query collector.MetricQueryFunc, database db.InstanceMetricsDB) (ifrit.Runner, error) { + mh := NewMetricHandler(logger, cfc, consumer, query, database) r := routes.MetricsCollectorRoutes() r.Get(routes.GetMetricHistoriesRouteName).Handler(VarsFunc(mh.GetMetricHistories)) diff --git a/src/autoscaler/metricscollector/server/server_suite_test.go b/src/autoscaler/metricscollector/server/server_suite_test.go index 349665b96..e3a17133d 100644 --- a/src/autoscaler/metricscollector/server/server_suite_test.go +++ b/src/autoscaler/metricscollector/server/server_suite_test.go @@ -1,9 +1,11 @@ package server_test import ( + "autoscaler/db" "autoscaler/metricscollector/config" "autoscaler/metricscollector/fakes" "autoscaler/metricscollector/server" + "autoscaler/models" "net/url" "strconv" @@ -36,8 +38,11 @@ var _ = BeforeSuite(func() { }, } database := &fakes.FakeInstanceMetricsDB{} + queryFunc := func(appID string, start int64, end int64, order db.OrderType, labels map[string]string) ([]*models.AppInstanceMetric, bool) { + return nil, false + } - httpServer, err := server.NewServer(lager.NewLogger("test"), conf, cfc, consumer, database) + httpServer, err := server.NewServer(lager.NewLogger("test"), conf, cfc, consumer, queryFunc, database) Expect(err).NotTo(HaveOccurred()) serverUrl, err = url.Parse("http://127.0.0.1:" + strconv.Itoa(port)) diff --git a/src/autoscaler/models/app.go b/src/autoscaler/models/app.go index b981770da..53cb3195d 100644 --- a/src/autoscaler/models/app.go +++ b/src/autoscaler/models/app.go @@ -44,14 +44,6 @@ type AppMonitor struct { StatWindow time.Duration } -type AppMetric struct { - AppId string `json:"app_id"` - MetricType string `json:"name"` - Value string `json:"value"` - Unit string `json:"unit"` - Timestamp int64 `json:"timestamp"` -} - type AppScalingResult struct { AppId string `json:"app_id"` Status ScalingStatus `json:"status"` diff --git a/src/autoscaler/models/metrics.go b/src/autoscaler/models/metrics.go index 12d92b1c3..d70a92a47 100644 --- a/src/autoscaler/models/metrics.go +++ b/src/autoscaler/models/metrics.go @@ -1,17 +1,25 @@ package models +import ( + "fmt" +) + const ( UnitPercentage = "%" UnitMegaBytes = "MB" UnitNum = "" UnitMilliseconds = "ms" UnitRPS = "rps" -) -const MetricNameMemoryUtil = "memoryutil" -const MetricNameMemoryUsed = "memoryused" -const MetricNameThroughput = "throughput" -const MetricNameResponseTime = "responsetime" + MetricNameMemoryUtil = "memoryutil" + MetricNameMemoryUsed = "memoryused" + MetricNameThroughput = "throughput" + MetricNameResponseTime = "responsetime" + + MetricLabelAppID = "app_id" + MetricLabelInstanceIndex = "instance_index" + MetricLabelName = "name" +) type AppInstanceMetric struct { AppId string `json:"app_id"` @@ -22,3 +30,69 @@ type AppInstanceMetric struct { Value string `json:"value"` Timestamp int64 `json:"timestamp"` } + +func (m *AppInstanceMetric) GetTimestamp() int64 { + return m.Timestamp +} + +func (m *AppInstanceMetric) HasLabels(labels map[string]string) bool { + for k, v := range labels { + switch k { + case MetricLabelAppID: + if v == m.AppId { + continue + } else { + return false + } + case MetricLabelInstanceIndex: + if v == fmt.Sprintf("%d", m.InstanceIndex) { + continue + } else { + return false + } + case MetricLabelName: + if v == m.Name { + continue + } else { + return false + } + default: + return false + } + } + return true +} + +type AppMetric struct { + AppId string `json:"app_id"` + MetricType string `json:"name"` + Value string `json:"value"` + Unit string `json:"unit"` + Timestamp int64 `json:"timestamp"` +} + +func (m *AppMetric) GetTimestamp() int64 { + return m.Timestamp +} + +func (m *AppMetric) HasLabels(labels map[string]string) bool { + for k, v := range labels { + switch k { + case MetricLabelAppID: + if v == m.AppId { + continue + } else { + return false + } + case MetricLabelName: + if v == m.MetricType { + continue + } else { + return false + } + default: + return false + } + } + return true +} diff --git a/src/integration/components.go b/src/integration/components.go index f71c59dc7..eaf059455 100644 --- a/src/integration/components.go +++ b/src/integration/components.go @@ -61,8 +61,8 @@ type APIServerClient struct { } type ServiceBrokerConfig struct { - Port int `json:"port"` - PublicPort int `json:"publicPort"` + Port int `json:"port"` + PublicPort int `json:"publicPort"` EnableCustomMetrics bool `json:"enableCustomMetrics"` Username string `json:"username"` @@ -227,10 +227,10 @@ func (components *Components) Operator(confPath string, argv ...string) *ginkgom func (components *Components) PrepareServiceBrokerConfig(publicPort int, internalPort int, username string, password string, enableCustomMetrics bool, dbUri string, apiServerUri string, brokerApiHttpRequestTimeout time.Duration, tmpDir string) string { brokerConfig := ServiceBrokerConfig{ - Port: internalPort, - PublicPort: publicPort, - Username: username, - Password: password, + Port: internalPort, + PublicPort: publicPort, + Username: username, + Password: password, EnableCustomMetrics: enableCustomMetrics, DB: DBConfig{ URI: dbUri, @@ -441,10 +441,11 @@ func (components *Components) PrepareMetricsCollectorConfig(dbURI string, port i }, }, Collector: mcConfig.CollectorConfig{ - CollectInterval: collectInterval, - RefreshInterval: refreshInterval, - CollectMethod: collectMethod, - SaveInterval: saveInterval, + CollectInterval: collectInterval, + RefreshInterval: refreshInterval, + CollectMethod: collectMethod, + SaveInterval: saveInterval, + MetricCacheSizePerApp: 500, }, } return writeYmlConfig(tmpDir, MetricsCollector, &cfg)