Skip to content
This repository has been archived by the owner on Dec 1, 2021. It is now read-only.

cache metric data in metrics collector #379

Merged
merged 2 commits into from
Sep 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions src/autoscaler/collection/TSDCache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package collection

import (
"fmt"
"sync"
)

type TSD interface {
GetTimestamp() int64
HasLabels(map[string]string) bool
}

type TSDCache struct {
lock *sync.RWMutex
data []TSD
capacity int
cursor int
num int
}

func NewTSDCache(capacity int) *TSDCache {
if capacity <= 0 {
panic("invalid TSDCache capacity")
}
return &TSDCache{
lock: &sync.RWMutex{},
data: make([]TSD, capacity, capacity),
capacity: capacity,
cursor: 0,
}
}

func (c *TSDCache) binarySearch(t int64) int {
if c.num == 0 {
return 0
}
var l, r int
if c.data[c.cursor] == nil {
l = 0
r = c.cursor - 1
} else {
l = c.cursor
r = c.cursor - 1 + c.capacity
}

for {
if l > r {
return l
}
m := l + (r-l)/2
if t <= c.data[m%c.capacity].GetTimestamp() {
r = m - 1
} else {
l = m + 1
}
}
}

func (c *TSDCache) Put(d TSD) {
c.lock.Lock()
defer c.lock.Unlock()

defer func() {
c.num++
}()

if c.num == 0 || d.GetTimestamp() >= c.data[((c.cursor-1)+c.capacity)%c.capacity].GetTimestamp() {
c.data[c.cursor] = d
c.cursor = (c.cursor + 1) % c.capacity
return
}

pos := c.binarySearch(d.GetTimestamp())
if pos == c.cursor && c.data[c.cursor] != nil {
return
}

end := c.cursor
if c.data[end] != nil {
end += c.capacity
}
for i := end; i > pos; i-- {
c.data[i%c.capacity] = c.data[(i-1)%c.capacity]
}
c.data[pos%c.capacity] = d
c.cursor = (c.cursor + 1) % c.capacity
}

func (c *TSDCache) String() string {
c.lock.RLock()
defer c.lock.RUnlock()

var head, tail int
if c.data[c.cursor] == nil {
head = 0
tail = c.cursor - 1
} else {
head = c.cursor
tail = c.cursor + c.capacity - 1
}

s := make([]TSD, tail-head+1)
for i := 0; i <= tail-head; i++ {
s[i] = c.data[(i+head)%c.capacity]
}
return fmt.Sprint(s)
}

//
// Query returns the time series with the timestamp in [start, end)
// If it can not guarantee all the data are in cache, it returns (nil, false)
//
func (c *TSDCache) Query(start, end int64, labels map[string]string) ([]TSD, bool) {
c.lock.RLock()
defer c.lock.RUnlock()

if c.num > c.capacity && c.data[c.cursor].GetTimestamp() >= start {
return nil, false
}

from := c.binarySearch(start)
to := c.binarySearch(end)
result := []TSD{}
for i := from; i < to; i++ {
d := c.data[i%c.capacity]
if d.HasLabels(labels) {
result = append(result, d)
}
}
return result, true
}
199 changes: 199 additions & 0 deletions src/autoscaler/collection/TSDCache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package collection_test

import (
"time"

. "autoscaler/collection"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

var _ = Describe("TSDCache", func() {
var (
cache *TSDCache
capacity int
err interface{}
labels map[string]string
)

JustBeforeEach(func() {
defer func() {
err = recover()
}()
cache = NewTSDCache(capacity)
})

Describe("NewTSDCache", func() {
Context("when creating TSDCache with invalid capacity", func() {
BeforeEach(func() {
capacity = -1
})
It("panics", func() {
Expect(err).To(Equal("invalid TSDCache capacity"))
})
})
Context("when creating TSDCache with valid capacity", func() {
BeforeEach(func() {
capacity = 10

})
It("returns the TSDCache", func() {
Expect(err).To(BeNil())
Expect(cache).NotTo(BeNil())
})
})
})

Describe("Put", func() {
Context("when cache capacity is 1", func() {
BeforeEach(func() {
capacity = 1
})
It("only caches the latest data", func() {
cache.Put(TestTSD{10, nil})
Expect(cache.String()).To(Equal("[{10 map[]}]"))
cache.Put(TestTSD{20, nil})
Expect(cache.String()).To(Equal("[{20 map[]}]"))
cache.Put(TestTSD{15, nil})
Expect(cache.String()).To(Equal("[{20 map[]}]"))
cache.Put(TestTSD{30, nil})
Expect(cache.String()).To(Equal("[{30 map[]}]"))
})
})
Context("when data put to cache do not execeed the capacity", func() {
BeforeEach(func() {
capacity = 5
})
It("cache all data in ascending order", func() {
cache.Put(TestTSD{20, nil})
cache.Put(TestTSD{10, nil})
cache.Put(TestTSD{40, nil})
cache.Put(TestTSD{50, nil})
cache.Put(TestTSD{30, nil})
Expect(cache.String()).To(Equal("[{10 map[]} {20 map[]} {30 map[]} {40 map[]} {50 map[]}]"))
})
})
Context("when data put to cache execeed the capacity", func() {
BeforeEach(func() {
capacity = 3
})
It("caches latest data in ascending order", func() {
cache.Put(TestTSD{20, nil})
Expect(cache.String()).To(Equal("[{20 map[]}]"))
cache.Put(TestTSD{10, nil})
Expect(cache.String()).To(Equal("[{10 map[]} {20 map[]}]"))
cache.Put(TestTSD{40, nil})
Expect(cache.String()).To(Equal("[{10 map[]} {20 map[]} {40 map[]}]"))
cache.Put(TestTSD{50, nil})
Expect(cache.String()).To(Equal("[{20 map[]} {40 map[]} {50 map[]}]"))
cache.Put(TestTSD{30, nil})
Expect(cache.String()).To(Equal("[{30 map[]} {40 map[]} {50 map[]}]"))
cache.Put(TestTSD{50, nil})
Expect(cache.String()).To(Equal("[{40 map[]} {50 map[]} {50 map[]}]"))
})
})

})

Describe("Query", func() {
Context("when cache is empty", func() {
It("return empty results", func() {
result, ok := cache.Query(0, time.Now().UnixNano(), labels)
Expect(ok).To(BeTrue())
Expect(result).To(BeEmpty())
})
})
Context("when data put to cache do not execeeds the capacity", func() {
BeforeEach(func() {
capacity = 5
})
It("returns the data in [start, end)", func() {
cache.Put(TestTSD{20, nil})
result, ok := cache.Query(10, 40, labels)
Expect(ok).To(BeTrue())
Expect(result).To(Equal([]TSD{TestTSD{20, nil}}))

cache.Put(TestTSD{10, nil})
result, ok = cache.Query(10, 40, labels)
Expect(ok).To(BeTrue())
Expect(result).To(Equal([]TSD{TestTSD{10, nil}, TestTSD{20, nil}}))

cache.Put(TestTSD{40, nil})
result, ok = cache.Query(10, 40, labels)
Expect(ok).To(BeTrue())
Expect(result).To(Equal([]TSD{TestTSD{10, nil}, TestTSD{20, nil}}))

cache.Put(TestTSD{30, nil})
result, ok = cache.Query(10, 40, labels)
Expect(ok).To(BeTrue())
Expect(result).To(Equal([]TSD{TestTSD{10, nil}, TestTSD{20, nil}, TestTSD{30, nil}}))

cache.Put(TestTSD{50, nil})
result, ok = cache.Query(10, 40, labels)
Expect(ok).To(BeTrue())
Expect(result).To(Equal([]TSD{TestTSD{10, nil}, TestTSD{20, nil}, TestTSD{30, nil}}))
})
})

Context("when data put to cache execeed the capacity", func() {
BeforeEach(func() {
capacity = 3
})

Context("when all queried data are guarenteed in cache", func() {
It("returns the data in [start, end)", func() {
cache.Put(TestTSD{20, nil})
cache.Put(TestTSD{10, nil})
cache.Put(TestTSD{40, nil})
cache.Put(TestTSD{30, nil})

result, ok := cache.Query(30, 50, labels)
Expect(ok).To(BeTrue())
Expect(result).To(Equal([]TSD{TestTSD{30, nil}, TestTSD{40, nil}}))

cache.Put(TestTSD{50, nil})
result, ok = cache.Query(35, 50, labels)
Expect(ok).To(BeTrue())
Expect(result).To(Equal([]TSD{TestTSD{40, nil}}))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why query database directly, the "end" timestamp is included. Is that possible to make the implementation compatible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change made in https://github.com/cloudfoundry-incubator/app-autoscaler/pull/379/files#diff-689f77dbaf45e61a44746cb0b0ff27baR138
mtrcs, ok := h.queryFunc(appId, start, end+1, order, labels)

use start, end+1 to query cache will make sure you get metrics data in [start, end]

})
Context("when querying with labels", func() {
BeforeEach(func() {
capacity = 5
})
It("returns the data with all the labels", func() {
cache.Put(TestTSD{20, map[string]string{"tom": "cat", "pig": "pepper"}})
cache.Put(TestTSD{10, nil})
cache.Put(TestTSD{40, map[string]string{"jerry": "mouse", "tom": "cat", "peppa": "pig"}})
cache.Put(TestTSD{30, map[string]string{"jerry": "mouse"}})
cache.Put(TestTSD{50, nil})

result, ok := cache.Query(20, 60, map[string]string{"jerry": "mouse", "tom": "cat"})
Expect(ok).To(BeTrue())
Expect(result).To(Equal(([]TSD{TestTSD{40, map[string]string{"jerry": "mouse", "tom": "cat", "peppa": "pig"}}})))
})

})

})
Context("when queried data are possibly not in cache", func() {
It("returns false", func() {
cache.Put(TestTSD{20, nil})
cache.Put(TestTSD{10, nil})
cache.Put(TestTSD{40, nil})
cache.Put(TestTSD{30, nil})

_, ok := cache.Query(10, 50, labels)
Expect(ok).To(BeFalse())

cache.Put(TestTSD{50, nil})
_, ok = cache.Query(30, 50, labels)
Expect(ok).To(BeFalse())
})

})

})

})
})
32 changes: 32 additions & 0 deletions src/autoscaler/collection/collection_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package collection_test

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"testing"
)

func TestCollection(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Collection Suite")
}

type TestTSD struct {
timestamp int64
labels map[string]string
}

func (t TestTSD) GetTimestamp() int64 {
return t.timestamp
}

func (t TestTSD) HasLabels(labels map[string]string) bool {
for k, v := range labels {
if t.labels[k] == v {
continue
}
return false
}
return true
}
Loading