Skip to content

Commit

Permalink
Merge pull request #12807 from influxdata/jmw-cache-cleanups
Browse files Browse the repository at this point in the history
Cache cleanups
  • Loading branch information
zeebo authored Apr 1, 2019
2 parents e983809 + 96a01ee commit a7c3f20
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 383 deletions.
208 changes: 15 additions & 193 deletions tsdb/tsm1/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,154 +16,32 @@ import (
"go.uber.org/zap"
)

// ringShards specifies the number of partitions that the hash ring used to
// store the entry mappings contains. It must be a power of 2. From empirical
// testing, a value above the number of cores on the machine does not provide
// any additional benefit. For now we'll set it to the number of cores on the
// largest box we could imagine running influx.
const ringShards = 16

var (
// ErrSnapshotInProgress is returned if a snapshot is attempted while one is already running.
ErrSnapshotInProgress = fmt.Errorf("snapshot in progress")
)

// ErrCacheMemorySizeLimitExceeded returns an error indicating an operation
// could not be completed due to exceeding the cache-max-memory-size setting.
func ErrCacheMemorySizeLimitExceeded(n, limit uint64) error {
return fmt.Errorf("cache-max-memory-size exceeded: (%d/%d)", n, limit)
}

// entry is a set of values and some metadata.
type entry struct {
mu sync.RWMutex
values Values // All stored values.

// The type of values stored. Read only so doesn't need to be protected by
// mu.
vtype byte
}

// newEntryValues returns a new instance of entry with the given values. If the
// values are not valid, an error is returned.
func newEntryValues(values []Value) (*entry, error) {
e := &entry{}
e.values = make(Values, 0, len(values))
e.values = append(e.values, values...)

// No values, don't check types and ordering
if len(values) == 0 {
return e, nil
}

et := valueType(values[0])
for _, v := range values {
// Make sure all the values are the same type
if et != valueType(v) {
return nil, tsdb.ErrFieldTypeConflict
}
}

// Set the type of values stored.
e.vtype = et

return e, nil
// CacheMemorySizeLimitExceededError is the type of error returned from the cache when
// a write would place it over its size limit.
type CacheMemorySizeLimitExceededError struct {
Size uint64
Limit uint64
}

// add adds the given values to the entry.
func (e *entry) add(values []Value) error {
if len(values) == 0 {
return nil // Nothing to do.
}

// Are any of the new values the wrong type?
if e.vtype != 0 {
for _, v := range values {
if e.vtype != valueType(v) {
return tsdb.ErrFieldTypeConflict
}
}
}

// entry currently has no values, so add the new ones and we're done.
e.mu.Lock()
if len(e.values) == 0 {
e.values = values
e.vtype = valueType(values[0])
e.mu.Unlock()
return nil
}

// Append the new values to the existing ones...
e.values = append(e.values, values...)
e.mu.Unlock()
return nil
}

// deduplicate sorts and orders the entry's values. If values are already deduped and sorted,
// the function does no work and simply returns.
func (e *entry) deduplicate() {
e.mu.Lock()
defer e.mu.Unlock()

if len(e.values) <= 1 {
return
}
e.values = e.values.Deduplicate()
}

// count returns the number of values in this entry.
func (e *entry) count() int {
e.mu.RLock()
n := len(e.values)
e.mu.RUnlock()
return n
func (c CacheMemorySizeLimitExceededError) Error() string {
return fmt.Sprintf("cache-max-memory-size exceeded: (%d/%d)", c.Size, c.Limit)
}

// filter removes all values with timestamps between min and max inclusive.
func (e *entry) filter(min, max int64) {
e.mu.Lock()
if len(e.values) > 1 {
e.values = e.values.Deduplicate()
}
e.values = e.values.Exclude(min, max)
e.mu.Unlock()
}

// size returns the size of this entry in bytes.
func (e *entry) size() int {
e.mu.RLock()
sz := e.values.Size()
e.mu.RUnlock()
return sz
}

// InfluxQLType returns for the entry the data type of its values.
func (e *entry) InfluxQLType() (influxql.DataType, error) {
e.mu.RLock()
defer e.mu.RUnlock()
return e.values.InfluxQLType()
}

// storer is the interface that descibes a cache's store.
type storer interface {
entry(key []byte) *entry // Get an entry by its key.
write(key []byte, values Values) (bool, error) // Write an entry to the store.
add(key []byte, entry *entry) // Add a new entry to the store.
remove(key []byte) // Remove an entry from the store.
keys(sorted bool) [][]byte // Return an optionally sorted slice of entry keys.
apply(f func([]byte, *entry) error) error // Apply f to all entries in the store in parallel.
applySerial(f func([]byte, *entry) error) error // Apply f to all entries in serial.
reset() // Reset the store to an initial unused state.
split(n int) []storer // Split splits the store into n stores
count() int // Count returns the number of keys in the store
// ErrCacheMemorySizeLimitExceeded returns an error indicating an operation
// could not be completed due to exceeding the cache-max-memory-size setting.
func ErrCacheMemorySizeLimitExceeded(n, limit uint64) error {
return CacheMemorySizeLimitExceededError{Size: n, Limit: limit}
}

// Cache maintains an in-memory store of Values for a set of keys.
type Cache struct {
_ uint64 // Padding for 32 bit struct alignment
mu sync.RWMutex
store storer
store *ring
maxSize uint64

// snapshots are the cache objects that are currently being written to tsm files
Expand All @@ -175,53 +53,22 @@ type Cache struct {
tracker *cacheTracker
lastSnapshot time.Time
lastWriteTime time.Time

// A one time synchronization used to initial the cache with a store. Since the store can allocate a
// a large amount memory across shards, we lazily create it.
initialize atomic.Value
initializedCount uint32
}

// NewCache returns an instance of a cache which will use a maximum of maxSize bytes of memory.
// Only used for engine caches, never for snapshots.
func NewCache(maxSize uint64) *Cache {
c := &Cache{
return &Cache{
maxSize: maxSize,
store: emptyStore{},
store: newRing(),
lastSnapshot: time.Now(),
tracker: newCacheTracker(newCacheMetrics(nil), nil),
}
c.initialize.Store(&sync.Once{})
return c
}

// init initializes the cache and allocates the underlying store. Once initialized,
// the store re-used until Freed.
func (c *Cache) init() {
if !atomic.CompareAndSwapUint32(&c.initializedCount, 0, 1) {
return
}

c.mu.Lock()
c.store, _ = newring(ringShards)
c.mu.Unlock()
}

// Free releases the underlying store and memory held by the Cache.
func (c *Cache) Free() {
if !atomic.CompareAndSwapUint32(&c.initializedCount, 1, 0) {
return
}

c.mu.Lock()
c.store = emptyStore{}
c.mu.Unlock()
}

// Write writes the set of values for the key to the cache. This function is goroutine-safe.
// It returns an error if the cache will exceed its max size by adding the new values.
func (c *Cache) Write(key []byte, values []Value) error {
c.init()
addedSize := uint64(Values(values).Size())

// Enough room in the cache?
Expand Down Expand Up @@ -259,7 +106,6 @@ func (c *Cache) Write(key []byte, values []Value) error {
// values as possible. If one key fails, the others can still succeed and an
// error will be returned.
func (c *Cache) WriteMulti(values map[string][]Value) error {
c.init()
var addedSize uint64
for _, v := range values {
addedSize += uint64(Values(v).Size())
Expand Down Expand Up @@ -320,8 +166,6 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
// Snapshot takes a snapshot of the current cache, adds it to the slice of caches that
// are being flushed, and resets the current cache with new values.
func (c *Cache) Snapshot() (*Cache, error) {
c.init()

c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -334,13 +178,8 @@ func (c *Cache) Snapshot() (*Cache, error) {

// If no snapshot exists, create a new one, otherwise update the existing snapshot
if c.snapshot == nil {
store, err := newring(ringShards)
if err != nil {
return nil, err
}

c.snapshot = &Cache{
store: store,
store: newRing(),
tracker: newCacheTracker(c.tracker.metrics, c.tracker.labels),
}
}
Expand Down Expand Up @@ -384,8 +223,6 @@ func (c *Cache) Deduplicate() {
// ClearSnapshot removes the snapshot cache from the list of flushing caches and
// adjusts the size.
func (c *Cache) ClearSnapshot(success bool) {
c.init()

c.mu.RLock()
snapStore := c.snapshot.store
c.mu.RUnlock()
Expand Down Expand Up @@ -550,8 +387,6 @@ func (c *Cache) Values(key []byte) Values {
// with timestamps between min and max contained in the bucket identified
// by name from the cache.
func (c *Cache) DeleteBucketRange(name []byte, min, max int64) {
c.init()

// TODO(edd/jeff): find a way to optimize lock usage
c.mu.Lock()
defer c.mu.Unlock()
Expand Down Expand Up @@ -859,16 +694,3 @@ func valueType(v Value) byte {
return 0
}
}

type emptyStore struct{}

func (e emptyStore) entry(key []byte) *entry { return nil }
func (e emptyStore) write(key []byte, values Values) (bool, error) { return false, nil }
func (e emptyStore) add(key []byte, entry *entry) {}
func (e emptyStore) remove(key []byte) {}
func (e emptyStore) keys(sorted bool) [][]byte { return nil }
func (e emptyStore) apply(f func([]byte, *entry) error) error { return nil }
func (e emptyStore) applySerial(f func([]byte, *entry) error) error { return nil }
func (e emptyStore) reset() {}
func (e emptyStore) split(n int) []storer { return nil }
func (e emptyStore) count() int { return 0 }
Loading

0 comments on commit a7c3f20

Please sign in to comment.