diff --git a/trie/database.go b/trie/database.go index d8a0fa9c5342..c24d9b61f7a3 100644 --- a/trie/database.go +++ b/trie/database.go @@ -25,7 +25,7 @@ import ( "sync" "time" - "github.com/allegro/bigcache" + "github.com/VictoriaMetrics/fastcache" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" @@ -69,7 +69,7 @@ const secureKeyLength = 11 + 32 type Database struct { diskdb ethdb.KeyValueStore // Persistent storage for matured trie nodes - cleans *bigcache.BigCache // GC friendly memory cache of clean node RLPs + cleans *fastcache.Cache // GC friendly memory cache of clean node RLPs dirties map[common.Hash]*cachedNode // Data and references relationships of dirty nodes oldest common.Hash // Oldest tracked node, flush-list head newest common.Hash // Newest tracked node, flush-list tail @@ -299,16 +299,9 @@ func NewDatabase(diskdb ethdb.KeyValueStore) *Database { // before its written out to disk or garbage collected. It also acts as a read cache // for nodes loaded from disk. func NewDatabaseWithCache(diskdb ethdb.KeyValueStore, cache int) *Database { - var cleans *bigcache.BigCache + var cleans *fastcache.Cache if cache > 0 { - cleans, _ = bigcache.NewBigCache(bigcache.Config{ - Shards: 1024, - LifeWindow: time.Hour, - MaxEntriesInWindow: cache * 1024, - MaxEntrySize: 512, - HardMaxCacheSize: cache, - Hasher: trienodeHasher{}, - }) + cleans = fastcache.New(cache * 1024 * 1024) } return &Database{ diskdb: diskdb, @@ -384,7 +377,7 @@ func (db *Database) insertPreimage(hash common.Hash, preimage []byte) { func (db *Database) node(hash common.Hash) node { // Retrieve the node from the clean cache if available if db.cleans != nil { - if enc, err := db.cleans.Get(string(hash[:])); err == nil && enc != nil { + if enc := db.cleans.Get(nil, hash[:]); enc != nil { memcacheCleanHitMeter.Mark(1) memcacheCleanReadMeter.Mark(int64(len(enc))) return mustDecodeNode(hash[:], enc) @@ -404,7 +397,7 @@ func (db *Database) node(hash common.Hash) node { return nil } if db.cleans != nil { - db.cleans.Set(string(hash[:]), enc) + db.cleans.Set(hash[:], enc) memcacheCleanMissMeter.Mark(1) memcacheCleanWriteMeter.Mark(int64(len(enc))) } @@ -420,7 +413,7 @@ func (db *Database) Node(hash common.Hash) ([]byte, error) { } // Retrieve the node from the clean cache if available if db.cleans != nil { - if enc, err := db.cleans.Get(string(hash[:])); err == nil && enc != nil { + if enc := db.cleans.Get(nil, hash[:]); enc != nil { memcacheCleanHitMeter.Mark(1) memcacheCleanReadMeter.Mark(int64(len(enc))) return enc, nil @@ -438,7 +431,7 @@ func (db *Database) Node(hash common.Hash) ([]byte, error) { enc, err := db.diskdb.Get(hash[:]) if err == nil && enc != nil { if db.cleans != nil { - db.cleans.Set(string(hash[:]), enc) + db.cleans.Set(hash[:], enc) memcacheCleanMissMeter.Mark(1) memcacheCleanWriteMeter.Mark(int64(len(enc))) } @@ -835,7 +828,7 @@ func (c *cleaner) Put(key []byte, rlp []byte) error { } // Move the flushed node into the clean cache to prevent insta-reloads if c.db.cleans != nil { - c.db.cleans.Set(string(hash[:]), rlp) + c.db.cleans.Set(hash[:], rlp) } return nil } diff --git a/vendor/github.com/VictoriaMetrics/fastcache/LICENSE b/vendor/github.com/VictoriaMetrics/fastcache/LICENSE new file mode 100644 index 000000000000..9a8145e5834e --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2018 VictoriaMetrics + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/vendor/github.com/VictoriaMetrics/fastcache/README.md b/vendor/github.com/VictoriaMetrics/fastcache/README.md new file mode 100644 index 000000000000..ac6fbaf7f1eb --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/README.md @@ -0,0 +1,116 @@ +[![Build Status](https://travis-ci.org/VictoriaMetrics/fastcache.svg)](https://travis-ci.org/VictoriaMetrics/fastcache) +[![GoDoc](https://godoc.org/github.com/VictoriaMetrics/fastcache?status.svg)](http://godoc.org/github.com/VictoriaMetrics/fastcache) +[![Go Report](https://goreportcard.com/badge/github.com/VictoriaMetrics/fastcache)](https://goreportcard.com/report/github.com/VictoriaMetrics/fastcache) +[![codecov](https://codecov.io/gh/VictoriaMetrics/fastcache/branch/master/graph/badge.svg)](https://codecov.io/gh/VictoriaMetrics/fastcache) + +# fastcache - fast thread-safe inmemory cache for big number of entries in Go + +### Features + +* Fast. Performance scales on multi-core CPUs. See benchmark results below. +* Thread-safe. Concurrent goroutines may read and write into a single + cache instance. +* The fastcache is designed for storing big number of entries without + [GC overhead](https://syslog.ravelin.com/further-dangers-of-large-heaps-in-go-7a267b57d487). +* Fastcache automatically evicts old entries when reaching the maximum cache size + set on its creation. +* [Simple API](http://godoc.org/github.com/VictoriaMetrics/fastcache). +* Simple source code. +* Cache may be [saved to file](https://godoc.org/github.com/VictoriaMetrics/fastcache#Cache.SaveToFile) + and [loaded from file](https://godoc.org/github.com/VictoriaMetrics/fastcache#LoadFromFile). +* Works on [Google AppEngine](https://cloud.google.com/appengine/docs/go/). + + +### Benchmarks + +`Fastcache` performance is compared with [BigCache](https://github.com/allegro/bigcache), standard Go map +and [sync.Map](https://golang.org/pkg/sync/#Map). + +``` +GOMAXPROCS=4 go test github.com/VictoriaMetrics/fastcache -bench='Set|Get' -benchtime=10s +goos: linux +goarch: amd64 +pkg: github.com/VictoriaMetrics/fastcache +BenchmarkBigCacheSet-4 2000 10566656 ns/op 6.20 MB/s 4660369 B/op 6 allocs/op +BenchmarkBigCacheGet-4 2000 6902694 ns/op 9.49 MB/s 684169 B/op 131076 allocs/op +BenchmarkBigCacheSetGet-4 1000 17579118 ns/op 7.46 MB/s 5046744 B/op 131083 allocs/op +BenchmarkCacheSet-4 5000 3808874 ns/op 17.21 MB/s 1142 B/op 2 allocs/op +BenchmarkCacheGet-4 5000 3293849 ns/op 19.90 MB/s 1140 B/op 2 allocs/op +BenchmarkCacheSetGet-4 2000 8456061 ns/op 15.50 MB/s 2857 B/op 5 allocs/op +BenchmarkStdMapSet-4 2000 10559382 ns/op 6.21 MB/s 268413 B/op 65537 allocs/op +BenchmarkStdMapGet-4 5000 2687404 ns/op 24.39 MB/s 2558 B/op 13 allocs/op +BenchmarkStdMapSetGet-4 100 154641257 ns/op 0.85 MB/s 387405 B/op 65558 allocs/op +BenchmarkSyncMapSet-4 500 24703219 ns/op 2.65 MB/s 3426543 B/op 262411 allocs/op +BenchmarkSyncMapGet-4 5000 2265892 ns/op 28.92 MB/s 2545 B/op 79 allocs/op +BenchmarkSyncMapSetGet-4 1000 14595535 ns/op 8.98 MB/s 3417190 B/op 262277 allocs/op +``` + +`MB/s` column here actually means `millions of operations per second`. +As you can see, `fastcache` is faster than the `BigCache` in all the cases. +`fastcache` is faster than the standard Go map and `sync.Map` on workloads +with inserts. + + +### Limitations + +* Keys and values must be byte slices. Other types must be marshaled before + storing them in the cache. +* Big entries with sizes exceeding 64KB must be stored via [distinct API](http://godoc.org/github.com/VictoriaMetrics/fastcache#Cache.SetBig). +* There is no cache expiration. Entries are evicted from the cache only + on cache size overflow. Entry deadline may be stored inside the value in order + to implement cache expiration. + + +### Architecture details + +The cache uses ideas from [BigCache](https://github.com/allegro/bigcache): + +* The cache consists of many buckets, each with its own lock. + This helps scaling the performance on multi-core CPUs, since multiple + CPUs may concurrently access distinct buckets. +* Each bucket consists of a `hash(key) -> (key, value) position` map + and 64KB-sized byte slices (chunks) holding encoded `(key, value)` entries. + Each bucket contains only `O(chunksCount)` pointers. For instance, 64GB cache + would contain ~1M pointers, while similarly-sized `map[string][]byte` + would contain ~1B pointers for short keys and values. This would lead to + [huge GC overhead](https://syslog.ravelin.com/further-dangers-of-large-heaps-in-go-7a267b57d487). + +64KB-sized chunks reduce memory fragmentation and the total memory usage comparing +to a single big chunk per bucket. +Chunks are allocated off-heap if possible. This reduces total memory usage because +GC collects unused memory more frequently without the need in `GOGC` tweaking. + + +### Users + +* `Fastcache` has been extracted from [VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics) sources. + See [this article](https://medium.com/devopslinks/victoriametrics-creating-the-best-remote-storage-for-prometheus-5d92d66787ac) + for more info about `VictoriaMetrics`. + + +### FAQ + +#### What is the difference between `fastcache` and other similar caches like [BigCache](https://github.com/allegro/bigcache) or [FreeCache](https://github.com/coocood/freecache)? + +* `Fastcache` is faster. See benchmark results above. +* `Fastcache` uses less memory due to lower heap fragmentation. This allows + saving many GBs of memory on multi-GB caches. +* `Fastcache` API [is simpler](http://godoc.org/github.com/VictoriaMetrics/fastcache). + The API is designed to be used in zero-allocation mode. + + +#### Why `fastcache` doesn't support cache expiration? + +Because we don't need cache expiration in [VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics). +Cached entries inside `VictoriaMetrics` never expire. They are automatically evicted on cache size overflow. + +It is easy to implement cache expiration on top of `fastcache` by caching values +with marshaled deadlines and verifying deadlines after reading these values +from the cache. + + +#### Why `fastcache` doesn't support advanced features such as [thundering herd protection](https://en.wikipedia.org/wiki/Thundering_herd_problem) or callbacks on entries' eviction? + +Because these features would complicate the code and would make it slower. +`Fastcache` source code is simple - just copy-paste it and implement the feature you want +on top of it. diff --git a/vendor/github.com/VictoriaMetrics/fastcache/bigcache.go b/vendor/github.com/VictoriaMetrics/fastcache/bigcache.go new file mode 100644 index 000000000000..7ca6f483af2f --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/bigcache.go @@ -0,0 +1,152 @@ +package fastcache + +import ( + "sync" + "sync/atomic" + + xxhash "github.com/cespare/xxhash/v2" +) + +// maxSubvalueLen is the maximum size of subvalue chunk. +// +// - 16 bytes are for subkey encoding +// - 4 bytes are for len(key)+len(value) encoding inside fastcache +// - 1 byte is implementation detail of fastcache +const maxSubvalueLen = chunkSize - 16 - 4 - 1 + +// maxKeyLen is the maximum size of key. +// +// - 16 bytes are for (hash + valueLen) +// - 4 bytes are for len(key)+len(subkey) +// - 1 byte is implementation detail of fastcache +const maxKeyLen = chunkSize - 16 - 4 - 1 + +// SetBig sets (k, v) to c where len(v) may exceed 64KB. +// +// GetBig must be used for reading stored values. +// +// The stored entry may be evicted at any time either due to cache +// overflow or due to unlikely hash collision. +// Pass higher maxBytes value to New if the added items disappear +// frequently. +// +// It is safe to store entries smaller than 64KB with SetBig. +// +// k and v contents may be modified after returning from SetBig. +func (c *Cache) SetBig(k, v []byte) { + atomic.AddUint64(&c.bigStats.SetBigCalls, 1) + if len(k) > maxKeyLen { + atomic.AddUint64(&c.bigStats.TooBigKeyErrors, 1) + return + } + valueLen := len(v) + valueHash := xxhash.Sum64(v) + + // Split v into chunks with up to 64Kb each. + subkey := getSubkeyBuf() + var i uint64 + for len(v) > 0 { + subkey.B = marshalUint64(subkey.B[:0], valueHash) + subkey.B = marshalUint64(subkey.B, uint64(i)) + i++ + subvalueLen := maxSubvalueLen + if len(v) < subvalueLen { + subvalueLen = len(v) + } + subvalue := v[:subvalueLen] + v = v[subvalueLen:] + c.Set(subkey.B, subvalue) + } + + // Write metavalue, which consists of valueHash and valueLen. + subkey.B = marshalUint64(subkey.B[:0], valueHash) + subkey.B = marshalUint64(subkey.B, uint64(valueLen)) + c.Set(k, subkey.B) + putSubkeyBuf(subkey) +} + +// GetBig searches for the value for the given k, appends it to dst +// and returns the result. +// +// GetBig returns only values stored via SetBig. It doesn't work +// with values stored via other methods. +// +// k contents may be modified after returning from GetBig. +func (c *Cache) GetBig(dst, k []byte) []byte { + atomic.AddUint64(&c.bigStats.GetBigCalls, 1) + subkey := getSubkeyBuf() + defer putSubkeyBuf(subkey) + + // Read and parse metavalue + subkey.B = c.Get(subkey.B[:0], k) + if len(subkey.B) == 0 { + // Nothing found. + return dst + } + if len(subkey.B) != 16 { + atomic.AddUint64(&c.bigStats.InvalidMetavalueErrors, 1) + return dst + } + valueHash := unmarshalUint64(subkey.B) + valueLen := unmarshalUint64(subkey.B[8:]) + + // Collect result from chunks. + dstLen := len(dst) + if n := dstLen + int(valueLen) - cap(dst); n > 0 { + dst = append(dst[:cap(dst)], make([]byte, n)...) + } + dst = dst[:dstLen] + var i uint64 + for uint64(len(dst)-dstLen) < valueLen { + subkey.B = marshalUint64(subkey.B[:0], valueHash) + subkey.B = marshalUint64(subkey.B, uint64(i)) + i++ + dstNew := c.Get(dst, subkey.B) + if len(dstNew) == len(dst) { + // Cannot find subvalue + return dst[:dstLen] + } + dst = dstNew + } + + // Verify the obtained value. + v := dst[dstLen:] + if uint64(len(v)) != valueLen { + atomic.AddUint64(&c.bigStats.InvalidValueLenErrors, 1) + return dst[:dstLen] + } + h := xxhash.Sum64(v) + if h != valueHash { + atomic.AddUint64(&c.bigStats.InvalidValueHashErrors, 1) + return dst[:dstLen] + } + return dst +} + +func getSubkeyBuf() *bytesBuf { + v := subkeyPool.Get() + if v == nil { + return &bytesBuf{} + } + return v.(*bytesBuf) +} + +func putSubkeyBuf(bb *bytesBuf) { + bb.B = bb.B[:0] + subkeyPool.Put(bb) +} + +var subkeyPool sync.Pool + +type bytesBuf struct { + B []byte +} + +func marshalUint64(dst []byte, u uint64) []byte { + return append(dst, byte(u>>56), byte(u>>48), byte(u>>40), byte(u>>32), byte(u>>24), byte(u>>16), byte(u>>8), byte(u)) +} + +func unmarshalUint64(src []byte) uint64 { + _ = src[7] + return uint64(src[0])<<56 | uint64(src[1])<<48 | uint64(src[2])<<40 | uint64(src[3])<<32 | uint64(src[4])<<24 | uint64(src[5])<<16 | uint64(src[6])<<8 | uint64(src[7]) +} diff --git a/vendor/github.com/VictoriaMetrics/fastcache/fastcache.go b/vendor/github.com/VictoriaMetrics/fastcache/fastcache.go new file mode 100644 index 000000000000..7b5c7c21b457 --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/fastcache.go @@ -0,0 +1,403 @@ +// Package fastcache implements fast in-memory cache. +// +// The package has been extracted from https://victoriametrics.com/ +package fastcache + +import ( + "fmt" + "sync" + "sync/atomic" + + xxhash "github.com/cespare/xxhash/v2" +) + +const bucketsCount = 512 + +const chunkSize = 64 * 1024 + +const bucketSizeBits = 40 + +const genSizeBits = 64 - bucketSizeBits + +const maxGen = 1<= maxBucketSize { + panic(fmt.Errorf("too big maxBytes=%d; should be smaller than %d", maxBytes, maxBucketSize)) + } + maxChunks := (maxBytes + chunkSize - 1) / chunkSize + b.chunks = make([][]byte, maxChunks) + b.m = make(map[uint64]uint64) + b.Reset() +} + +func (b *bucket) Reset() { + b.mu.Lock() + chunks := b.chunks + for i := range chunks { + putChunk(chunks[i]) + chunks[i] = nil + } + bm := b.m + for k := range bm { + delete(bm, k) + } + b.idx = 0 + b.gen = 1 + atomic.StoreUint64(&b.getCalls, 0) + atomic.StoreUint64(&b.setCalls, 0) + atomic.StoreUint64(&b.misses, 0) + atomic.StoreUint64(&b.collisions, 0) + atomic.StoreUint64(&b.corruptions, 0) + b.mu.Unlock() +} + +func (b *bucket) Clean() { + b.mu.Lock() + bGen := b.gen & ((1 << genSizeBits) - 1) + bIdx := b.idx + bm := b.m + for k, v := range bm { + gen := v >> bucketSizeBits + idx := v & ((1 << bucketSizeBits) - 1) + if gen == bGen && idx < bIdx || gen+1 == bGen && idx >= bIdx || gen == maxGen && bGen == 1 && idx >= bIdx { + continue + } + delete(bm, k) + } + b.mu.Unlock() +} + +func (b *bucket) UpdateStats(s *Stats) { + s.GetCalls += atomic.LoadUint64(&b.getCalls) + s.SetCalls += atomic.LoadUint64(&b.setCalls) + s.Misses += atomic.LoadUint64(&b.misses) + s.Collisions += atomic.LoadUint64(&b.collisions) + s.Corruptions += atomic.LoadUint64(&b.corruptions) + + b.mu.RLock() + s.EntriesCount += uint64(len(b.m)) + for _, chunk := range b.chunks { + s.BytesSize += uint64(cap(chunk)) + } + b.mu.RUnlock() +} + +func (b *bucket) Set(k, v []byte, h uint64) { + setCalls := atomic.AddUint64(&b.setCalls, 1) + if setCalls%(1<<14) == 0 { + b.Clean() + } + + if len(k) >= (1<<16) || len(v) >= (1<<16) { + // Too big key or value - its length cannot be encoded + // with 2 bytes (see below). Skip the entry. + return + } + var kvLenBuf [4]byte + kvLenBuf[0] = byte(uint16(len(k)) >> 8) + kvLenBuf[1] = byte(len(k)) + kvLenBuf[2] = byte(uint16(len(v)) >> 8) + kvLenBuf[3] = byte(len(v)) + kvLen := uint64(len(kvLenBuf) + len(k) + len(v)) + if kvLen >= chunkSize { + // Do not store too big keys and values, since they do not + // fit a chunk. + return + } + + b.mu.Lock() + idx := b.idx + idxNew := idx + kvLen + chunkIdx := idx / chunkSize + chunkIdxNew := idxNew / chunkSize + if chunkIdxNew > chunkIdx { + if chunkIdxNew >= uint64(len(b.chunks)) { + idx = 0 + idxNew = kvLen + chunkIdx = 0 + b.gen++ + if b.gen&((1< 0 { + gen := v >> bucketSizeBits + idx := v & ((1 << bucketSizeBits) - 1) + if gen == bGen && idx < b.idx || gen+1 == bGen && idx >= b.idx || gen == maxGen && bGen == 1 && idx >= b.idx { + chunkIdx := idx / chunkSize + if chunkIdx >= uint64(len(b.chunks)) { + // Corrupted data during the load from file. Just skip it. + atomic.AddUint64(&b.corruptions, 1) + goto end + } + chunk := b.chunks[chunkIdx] + idx %= chunkSize + if idx+4 >= chunkSize { + // Corrupted data during the load from file. Just skip it. + atomic.AddUint64(&b.corruptions, 1) + goto end + } + kvLenBuf := chunk[idx : idx+4] + keyLen := (uint64(kvLenBuf[0]) << 8) | uint64(kvLenBuf[1]) + valLen := (uint64(kvLenBuf[2]) << 8) | uint64(kvLenBuf[3]) + idx += 4 + if idx+keyLen+valLen >= chunkSize { + // Corrupted data during the load from file. Just skip it. + atomic.AddUint64(&b.corruptions, 1) + goto end + } + if string(k) == string(chunk[idx:idx+keyLen]) { + idx += keyLen + if returnDst { + dst = append(dst, chunk[idx:idx+valLen]...) + } + found = true + } else { + atomic.AddUint64(&b.collisions, 1) + } + } + } +end: + b.mu.RUnlock() + if !found { + atomic.AddUint64(&b.misses, 1) + } + return dst, found +} + +func (b *bucket) Del(h uint64) { + b.mu.Lock() + delete(b.m, h) + b.mu.Unlock() +} diff --git a/vendor/github.com/VictoriaMetrics/fastcache/file.go b/vendor/github.com/VictoriaMetrics/fastcache/file.go new file mode 100644 index 000000000000..9c291afb0cab --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/file.go @@ -0,0 +1,397 @@ +package fastcache + +import ( + "encoding/binary" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "regexp" + "runtime" + + "github.com/golang/snappy" +) + +// SaveToFile atomically saves cache data to the given filePath using a single +// CPU core. +// +// SaveToFile may be called concurrently with other operations on the cache. +// +// The saved data may be loaded with LoadFromFile*. +// +// See also SaveToFileConcurrent for faster saving to file. +func (c *Cache) SaveToFile(filePath string) error { + return c.SaveToFileConcurrent(filePath, 1) +} + +// SaveToFileConcurrent saves cache data to the given filePath using concurrency +// CPU cores. +// +// SaveToFileConcurrent may be called concurrently with other operations +// on the cache. +// +// The saved data may be loaded with LoadFromFile*. +// +// See also SaveToFile. +func (c *Cache) SaveToFileConcurrent(filePath string, concurrency int) error { + // Create dir if it doesn't exist. + dir := filepath.Dir(filePath) + if _, err := os.Stat(dir); err != nil { + if !os.IsNotExist(err) { + return fmt.Errorf("cannot stat %q: %s", dir, err) + } + if err := os.MkdirAll(dir, 0755); err != nil { + return fmt.Errorf("cannot create dir %q: %s", dir, err) + } + } + + // Save cache data into a temporary directory. + tmpDir, err := ioutil.TempDir(dir, "fastcache.tmp.") + if err != nil { + return fmt.Errorf("cannot create temporary dir inside %q: %s", dir, err) + } + defer func() { + if tmpDir != "" { + _ = os.RemoveAll(tmpDir) + } + }() + gomaxprocs := runtime.GOMAXPROCS(-1) + if concurrency <= 0 || concurrency > gomaxprocs { + concurrency = gomaxprocs + } + if err := c.save(tmpDir, concurrency); err != nil { + return fmt.Errorf("cannot save cache data to temporary dir %q: %s", tmpDir, err) + } + + // Remove old filePath contents, since os.Rename may return + // error if filePath dir exists. + if err := os.RemoveAll(filePath); err != nil { + return fmt.Errorf("cannot remove old contents at %q: %s", filePath, err) + } + if err := os.Rename(tmpDir, filePath); err != nil { + return fmt.Errorf("cannot move temporary dir %q to %q: %s", tmpDir, filePath, err) + } + tmpDir = "" + return nil +} + +// LoadFromFile loads cache data from the given filePath. +// +// See SaveToFile* for saving cache data to file. +func LoadFromFile(filePath string) (*Cache, error) { + return load(filePath, 0) +} + +// LoadFromFileOrNew tries loading cache data from the given filePath. +// +// The function falls back to creating new cache with the given maxBytes +// capacity if error occurs during loading the cache from file. +func LoadFromFileOrNew(filePath string, maxBytes int) *Cache { + c, err := load(filePath, maxBytes) + if err == nil { + return c + } + return New(maxBytes) +} + +func (c *Cache) save(dir string, workersCount int) error { + if err := saveMetadata(c, dir); err != nil { + return err + } + + // Save buckets by workersCount concurrent workers. + workCh := make(chan int, workersCount) + results := make(chan error) + for i := 0; i < workersCount; i++ { + go func(workerNum int) { + results <- saveBuckets(c.buckets[:], workCh, dir, workerNum) + }(i) + } + // Feed workers with work + for i := range c.buckets[:] { + workCh <- i + } + close(workCh) + + // Read results. + var err error + for i := 0; i < workersCount; i++ { + result := <-results + if result != nil && err != nil { + err = result + } + } + return err +} + +func load(filePath string, maxBytes int) (*Cache, error) { + maxBucketChunks, err := loadMetadata(filePath) + if err != nil { + return nil, err + } + if maxBytes > 0 { + maxBucketBytes := uint64((maxBytes + bucketsCount - 1) / bucketsCount) + expectedBucketChunks := (maxBucketBytes + chunkSize - 1) / chunkSize + if maxBucketChunks != expectedBucketChunks { + return nil, fmt.Errorf("cache file %s contains maxBytes=%d; want %d", filePath, maxBytes, expectedBucketChunks*chunkSize*bucketsCount) + } + } + + // Read bucket files from filePath dir. + d, err := os.Open(filePath) + if err != nil { + return nil, fmt.Errorf("cannot open %q: %s", filePath, err) + } + defer func() { + _ = d.Close() + }() + fis, err := d.Readdir(-1) + if err != nil { + return nil, fmt.Errorf("cannot read files from %q: %s", filePath, err) + } + results := make(chan error) + workersCount := 0 + var c Cache + for _, fi := range fis { + fn := fi.Name() + if fi.IsDir() || !dataFileRegexp.MatchString(fn) { + continue + } + workersCount++ + go func(dataPath string) { + results <- loadBuckets(c.buckets[:], dataPath, maxBucketChunks) + }(filePath + "/" + fn) + } + err = nil + for i := 0; i < workersCount; i++ { + result := <-results + if result != nil && err == nil { + err = result + } + } + if err != nil { + return nil, err + } + return &c, nil +} + +func saveMetadata(c *Cache, dir string) error { + metadataPath := dir + "/metadata.bin" + metadataFile, err := os.Create(metadataPath) + if err != nil { + return fmt.Errorf("cannot create %q: %s", metadataPath, err) + } + defer func() { + _ = metadataFile.Close() + }() + maxBucketChunks := uint64(cap(c.buckets[0].chunks)) + if err := writeUint64(metadataFile, maxBucketChunks); err != nil { + return fmt.Errorf("cannot write maxBucketChunks=%d to %q: %s", maxBucketChunks, metadataPath, err) + } + return nil +} + +func loadMetadata(dir string) (uint64, error) { + metadataPath := dir + "/metadata.bin" + metadataFile, err := os.Open(metadataPath) + if err != nil { + return 0, fmt.Errorf("cannot open %q: %s", metadataPath, err) + } + defer func() { + _ = metadataFile.Close() + }() + maxBucketChunks, err := readUint64(metadataFile) + if err != nil { + return 0, fmt.Errorf("cannot read maxBucketChunks from %q: %s", metadataPath, err) + } + return maxBucketChunks, nil +} + +var dataFileRegexp = regexp.MustCompile(`^data\.\d+\.bin$`) + +func saveBuckets(buckets []bucket, workCh <-chan int, dir string, workerNum int) error { + dataPath := fmt.Sprintf("%s/data.%d.bin", dir, workerNum) + dataFile, err := os.Create(dataPath) + if err != nil { + return fmt.Errorf("cannot create %q: %s", dataPath, err) + } + defer func() { + _ = dataFile.Close() + }() + zw := snappy.NewBufferedWriter(dataFile) + for bucketNum := range workCh { + if err := writeUint64(zw, uint64(bucketNum)); err != nil { + return fmt.Errorf("cannot write bucketNum=%d to %q: %s", bucketNum, dataPath, err) + } + if err := buckets[bucketNum].Save(zw); err != nil { + return fmt.Errorf("cannot save bucket[%d] to %q: %s", bucketNum, dataPath, err) + } + } + if err := zw.Close(); err != nil { + return fmt.Errorf("cannot close snappy.Writer for %q: %s", dataPath, err) + } + return nil +} + +func loadBuckets(buckets []bucket, dataPath string, maxChunks uint64) error { + dataFile, err := os.Open(dataPath) + if err != nil { + return fmt.Errorf("cannot open %q: %s", dataPath, err) + } + defer func() { + _ = dataFile.Close() + }() + zr := snappy.NewReader(dataFile) + for { + bucketNum, err := readUint64(zr) + if err == io.EOF { + // Reached the end of file. + return nil + } + if bucketNum >= uint64(len(buckets)) { + return fmt.Errorf("unexpected bucketNum read from %q: %d; must be smaller than %d", dataPath, bucketNum, len(buckets)) + } + if err := buckets[bucketNum].Load(zr, maxChunks); err != nil { + return fmt.Errorf("cannot load bucket[%d] from %q: %s", bucketNum, dataPath, err) + } + } +} + +func (b *bucket) Save(w io.Writer) error { + b.Clean() + + b.mu.RLock() + defer b.mu.RUnlock() + + // Store b.idx, b.gen and b.m to w. + + bIdx := b.idx + bGen := b.gen + chunksLen := 0 + for _, chunk := range b.chunks { + if chunk == nil { + break + } + chunksLen++ + } + kvs := make([]byte, 0, 2*8*len(b.m)) + var u64Buf [8]byte + for k, v := range b.m { + binary.LittleEndian.PutUint64(u64Buf[:], k) + kvs = append(kvs, u64Buf[:]...) + binary.LittleEndian.PutUint64(u64Buf[:], v) + kvs = append(kvs, u64Buf[:]...) + } + + if err := writeUint64(w, bIdx); err != nil { + return fmt.Errorf("cannot write b.idx: %s", err) + } + if err := writeUint64(w, bGen); err != nil { + return fmt.Errorf("cannot write b.gen: %s", err) + } + if err := writeUint64(w, uint64(len(kvs))/2/8); err != nil { + return fmt.Errorf("cannot write len(b.m): %s", err) + } + if _, err := w.Write(kvs); err != nil { + return fmt.Errorf("cannot write b.m: %s", err) + } + + // Store b.chunks to w. + if err := writeUint64(w, uint64(chunksLen)); err != nil { + return fmt.Errorf("cannot write len(b.chunks): %s", err) + } + for chunkIdx := 0; chunkIdx < chunksLen; chunkIdx++ { + chunk := b.chunks[chunkIdx][:chunkSize] + if _, err := w.Write(chunk); err != nil { + return fmt.Errorf("cannot write b.chunks[%d]: %s", chunkIdx, err) + } + } + + return nil +} + +func (b *bucket) Load(r io.Reader, maxChunks uint64) error { + bIdx, err := readUint64(r) + if err != nil { + return fmt.Errorf("cannot read b.idx: %s", err) + } + bGen, err := readUint64(r) + if err != nil { + return fmt.Errorf("cannot read b.gen: %s", err) + } + kvsLen, err := readUint64(r) + if err != nil { + return fmt.Errorf("cannot read len(b.m): %s", err) + } + kvsLen *= 2 * 8 + kvs := make([]byte, kvsLen) + if _, err := io.ReadFull(r, kvs); err != nil { + return fmt.Errorf("cannot read b.m: %s", err) + } + m := make(map[uint64]uint64, kvsLen/2/8) + for len(kvs) > 0 { + k := binary.LittleEndian.Uint64(kvs) + kvs = kvs[8:] + v := binary.LittleEndian.Uint64(kvs) + kvs = kvs[8:] + m[k] = v + } + + maxBytes := maxChunks * chunkSize + if maxBytes >= maxBucketSize { + return fmt.Errorf("too big maxBytes=%d; should be smaller than %d", maxBytes, maxBucketSize) + } + chunks := make([][]byte, maxChunks) + chunksLen, err := readUint64(r) + if err != nil { + return fmt.Errorf("cannot read len(b.chunks): %s", err) + } + if chunksLen > uint64(maxChunks) { + return fmt.Errorf("chunksLen=%d cannot exceed maxChunks=%d", chunksLen, maxChunks) + } + currChunkIdx := bIdx / chunkSize + if currChunkIdx > 0 && currChunkIdx >= chunksLen { + return fmt.Errorf("too big bIdx=%d; should be smaller than %d", bIdx, chunksLen*chunkSize) + } + for chunkIdx := uint64(0); chunkIdx < chunksLen; chunkIdx++ { + chunk := getChunk() + if _, err := io.ReadFull(r, chunk); err != nil { + return fmt.Errorf("cannot read b.chunks[%d]: %s", chunkIdx, err) + } + chunks[chunkIdx] = chunk + } + // Adjust len for the chunk pointed by currChunkIdx. + if chunksLen > 0 { + chunkLen := bIdx % chunkSize + chunks[currChunkIdx] = chunks[currChunkIdx][:chunkLen] + } + + b.mu.Lock() + for _, chunk := range b.chunks { + putChunk(chunk) + } + b.chunks = chunks + b.m = m + b.idx = bIdx + b.gen = bGen + b.mu.Unlock() + + return nil +} + +func writeUint64(w io.Writer, u uint64) error { + var u64Buf [8]byte + binary.LittleEndian.PutUint64(u64Buf[:], u) + _, err := w.Write(u64Buf[:]) + return err +} + +func readUint64(r io.Reader) (uint64, error) { + var u64Buf [8]byte + if _, err := io.ReadFull(r, u64Buf[:]); err != nil { + return 0, err + } + u := binary.LittleEndian.Uint64(u64Buf[:]) + return u, nil +} diff --git a/vendor/github.com/VictoriaMetrics/fastcache/go.mod b/vendor/github.com/VictoriaMetrics/fastcache/go.mod new file mode 100644 index 000000000000..9b0693f31b5b --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/go.mod @@ -0,0 +1,11 @@ +module github.com/VictoriaMetrics/fastcache + +require ( + github.com/OneOfOne/xxhash v1.2.5 // indirect + github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 + github.com/cespare/xxhash/v2 v2.0.1-0.20190104013014-3767db7a7e18 + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/golang/snappy v0.0.1 + github.com/spaolacci/murmur3 v1.0.1-0.20190317074736-539464a789e9 // indirect + github.com/stretchr/testify v1.3.0 // indirect +) diff --git a/vendor/github.com/VictoriaMetrics/fastcache/go.sum b/vendor/github.com/VictoriaMetrics/fastcache/go.sum new file mode 100644 index 000000000000..3ff9edb14cba --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/go.sum @@ -0,0 +1,24 @@ +github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= +github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/OneOfOne/xxhash v1.2.5 h1:zl/OfRA6nftbBK9qTohYBJ5xvw6C/oNKizR7cZGl3cI= +github.com/OneOfOne/xxhash v1.2.5/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= +github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8= +github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= +github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= +github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= +github.com/cespare/xxhash/v2 v2.0.1-0.20190104013014-3767db7a7e18 h1:pl4eWIqvFe/Kg3zkn7NxevNzILnZYWDCG7qbA1CJik0= +github.com/cespare/xxhash/v2 v2.0.1-0.20190104013014-3767db7a7e18/go.mod h1:HD5P3vAIAh+Y2GAxg0PrPN1P8WkepXGpjbUPDHJqqKM= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spaolacci/murmur3 v1.0.1-0.20190317074736-539464a789e9 h1:5Cp3cVwpQP4aCQ6jx6dNLP3IarbYiuStmIzYu+BjQwY= +github.com/spaolacci/murmur3 v1.0.1-0.20190317074736-539464a789e9/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= diff --git a/vendor/github.com/VictoriaMetrics/fastcache/malloc_heap.go b/vendor/github.com/VictoriaMetrics/fastcache/malloc_heap.go new file mode 100644 index 000000000000..79a71832adbd --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/malloc_heap.go @@ -0,0 +1,11 @@ +// +build appengine windows + +package fastcache + +func getChunk() []byte { + return make([]byte, chunkSize) +} + +func putChunk(chunk []byte) { + // No-op. +} diff --git a/vendor/github.com/VictoriaMetrics/fastcache/malloc_mmap.go b/vendor/github.com/VictoriaMetrics/fastcache/malloc_mmap.go new file mode 100644 index 000000000000..424b79b43ac3 --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/malloc_mmap.go @@ -0,0 +1,52 @@ +// +build !appengine,!windows + +package fastcache + +import ( + "fmt" + "sync" + "syscall" + "unsafe" +) + +const chunksPerAlloc = 1024 + +var ( + freeChunks []*[chunkSize]byte + freeChunksLock sync.Mutex +) + +func getChunk() []byte { + freeChunksLock.Lock() + if len(freeChunks) == 0 { + // Allocate offheap memory, so GOGC won't take into account cache size. + // This should reduce free memory waste. + data, err := syscall.Mmap(-1, 0, chunkSize*chunksPerAlloc, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE) + if err != nil { + panic(fmt.Errorf("cannot allocate %d bytes via mmap: %s", chunkSize*chunksPerAlloc, err)) + } + for len(data) > 0 { + p := (*[chunkSize]byte)(unsafe.Pointer(&data[0])) + freeChunks = append(freeChunks, p) + data = data[chunkSize:] + } + } + n := len(freeChunks) - 1 + p := freeChunks[n] + freeChunks[n] = nil + freeChunks = freeChunks[:n] + freeChunksLock.Unlock() + return p[:] +} + +func putChunk(chunk []byte) { + if chunk == nil { + return + } + chunk = chunk[:chunkSize] + p := (*[chunkSize]byte)(unsafe.Pointer(&chunk[0])) + + freeChunksLock.Lock() + freeChunks = append(freeChunks, p) + freeChunksLock.Unlock() +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 33aab0e11907..701fc832356e 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -20,6 +20,12 @@ "revision": "5d049714c4a64225c3c79a7cf7d02f7fb5b96338", "revisionTime": "2018-01-16T20:38:02Z" }, + { + "checksumSHA1": "wp83b8O9IQzAMCHScQ/2NdGZGHE=", + "path": "github.com/VictoriaMetrics/fastcache", + "revision": "eb9424710c80f12784099cce35e3999502c146a5", + "revisionTime": "2019-06-11T20:51:40Z" + }, { "checksumSHA1": "8skJYOdQytXjimcDPLRW4tonX3A=", "path": "github.com/allegro/bigcache", @@ -56,6 +62,10 @@ "revision": "165db2f241fd235aec29ba6d9b1ccd5f1c14637c", "revisionTime": "2015-01-22T07:26:53Z" }, + { + "path": "github.com/cespare/xxhash/v2", + "revision": "" + }, { "checksumSHA1": "dvabztWVQX8f6oMLRyv4dLH+TGY=", "path": "github.com/davecgh/go-spew/spew",