Skip to content

Commit

Permalink
blockchain: Introduce slice of maps for the cache map
Browse files Browse the repository at this point in the history
A standard map in the go library only allocates 2^N buckets.  Bucket
size is a fixed value (16+(keysize*8)+(valuesize*8)).  This means that
a map will always double in size when allocating more key/value pairs
to fit into the map.

This breaks the utxomaxcachesize promise that we make to the user and
for bigger sizes, the bigger the difference is.  For example, if the
user gives a utxomaxcache size of 18,000MiB, the map size will be around
13,000MiB.  However, when this map gets filled up, the next size is
around 27,000MiB, passing the 18,000MiB memory limit that the user gave.

Because of this, the commit fixes this problem by having a slice of
maps.  This enables us to keep the size of the maps under the limit that
the user gave us as we can allcate many maps at different sizes.
  • Loading branch information
kcalvinalvin committed Mar 15, 2023
1 parent c2bf96c commit b34a14b
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 61 deletions.
214 changes: 171 additions & 43 deletions blockchain/utxocache.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,125 @@ type utxoBatcher interface {
getEntries(outpoints []wire.OutPoint) ([]*UtxoEntry, error)
}

// mapSlice is a slice of maps for Utxo entries. The slice of maps are needed to
// guarantee that the map will only take up N amount of bytes. As of v1.20, the
// go runtime will allocate 2^N + few extra buckets, meaning that for large N, we'll
// allocate a lot of extra memory if the amount of entries goes over the previously
// allocated buckets. A slice of maps allows us to have a better control of how much
// total memory gets allocated by all the maps.
type mapSlice struct {
maps []map[wire.OutPoint]*UtxoEntry

// maxEntries is the maximum amount of elemnts that the map is allocated for.
maxEntries []int

// maxTotalMemoryUsage is the maximum memory usage in bytes that the state
// should contain in normal circumstances.
maxTotalMemoryUsage uint64
}

// length returns the length of all the maps in the map slice added together.
func (ms *mapSlice) length() int {
var l int
for _, m := range ms.maps {
l += len(m)
}

return l
}

// size returns the size of all the maps in the map slice added together.
func (ms *mapSlice) size() int {
var size int
for _, num := range ms.maxEntries {
size += CalculateRoughMapSize(num, bucketSize)
}

return size
}

// get looks for the outpoint in all the maps in the map slice and returns
// the entry. nil and false is returned if the outpoint is not found.
func (ms *mapSlice) get(op wire.OutPoint) (*UtxoEntry, bool) {
var entry *UtxoEntry
var found bool

for _, m := range ms.maps {
entry, found = m[op]
if found {
break
}
}

return entry, found
}

// put puts the outpoint and the entry into one of the maps in the map slice. If the
// existing maps are all full, it will allocate a new map based on how much memory we
// have left over. Leftover memory is calculated as:
// maxTotalMemoryUsage - (totalEntryMemory + mapSlice.size())
func (ms *mapSlice) put(op wire.OutPoint, entry *UtxoEntry, totalEntryMemory uint64) {
var success bool
for i, maxNum := range ms.maxEntries {
m := ms.maps[i]
_, found := m[op]
if found {
// If the key is found, overwrite it.
m[op] = entry
success = true
continue
}
if len(m) >= maxNum {
// Don't try to insert if the map already at max since
// that'll force the map to allocate double the memory it's
// currently taking up.
continue
}

m[op] = entry
success = true
}

if !success {
ms.makeNewMap(totalEntryMemory)
m := ms.maps[len(ms.maps)-1]
m[op] = entry
}
}

// delete attempts to delete the given outpoint in all of the maps. No-op if the
// outpoint doesn't exist.
func (ms *mapSlice) delete(op wire.OutPoint) {
for i := 0; i < len(ms.maps); i++ {
delete(ms.maps[i], op)
}
}

// makeNewMap makes and appends the new map into the map slice.
func (ms *mapSlice) makeNewMap(totalEntryMemory uint64) {
// Get the size of the leftover memory.
memSize := ms.maxTotalMemoryUsage - totalEntryMemory
for _, maxNum := range ms.maxEntries {
memSize -= uint64(CalculateRoughMapSize(maxNum, bucketSize))
}

// Get a new map that's sized to house inside the leftover memory.
numMaxElements := CalculateMinEntries(int(memSize), bucketSize+avgEntrySize)
numMaxElements -= 1
ms.maxEntries = append(ms.maxEntries, numMaxElements)
ms.maps = append(ms.maps, make(map[wire.OutPoint]*UtxoEntry, numMaxElements))
}

// deleteMaps deletes all maps except for the first one which should be the biggest.
func (ms *mapSlice) deleteMaps() {
// The rest of the map is now available to be garbage collected.
m := ms.maps[0]
ms.maps = []map[wire.OutPoint]*UtxoEntry{m}

size := ms.maxEntries[0]
ms.maxEntries = []int{size}
}

// utxoCache is a cached utxo view in the chainstate of a BlockChain.
//
// It implements the utxoView interface, but should only be used as such with the
Expand All @@ -246,9 +365,6 @@ type utxoCache struct {
// should contain in normal circumstances.
maxTotalMemoryUsage uint64

// maxElements is the maximum amount of elemnts that the map is allocated for.
maxElements int

// This mutex protects the internal state.
// A simple mutex instead of a read-write mutex is chosen because the main
// read method also possibly does a write on a cache miss.
Expand All @@ -258,7 +374,7 @@ type utxoCache struct {
// flag indicates that the state of the entry (potentially) deviates from the
// state in the database. Explicit nil values in the map are used to
// indicate that the database does not contain the entry.
cachedEntries map[wire.OutPoint]*UtxoEntry
cachedEntries mapSlice
totalEntryMemory uint64 // Total memory usage in bytes.
lastFlushHash chainhash.Hash
}
Expand All @@ -268,15 +384,21 @@ type utxoCache struct {
func newUtxoCache(db database.DB, maxTotalMemoryUsage uint64) *utxoCache {
// While the entry isn't included in the map size, add the average size to the
// bucket size so we get some leftover space for entries to take up.
numMaxElements := CalculateEntries(int(maxTotalMemoryUsage), bucketSize+avgEntrySize)
numMaxElements := CalculateMinEntries(int(maxTotalMemoryUsage), bucketSize+avgEntrySize)
numMaxElements -= 1

log.Infof("Pre-alloacting for %d MiB: ", maxTotalMemoryUsage/(1024*1024)+1)

m := make(map[wire.OutPoint]*UtxoEntry, numMaxElements)

return &utxoCache{
db: db,
maxTotalMemoryUsage: maxTotalMemoryUsage,
cachedEntries: make(map[wire.OutPoint]*UtxoEntry, numMaxElements),
maxElements: numMaxElements,
cachedEntries: mapSlice{
maps: []map[wire.OutPoint]*UtxoEntry{m},
maxEntries: []int{numMaxElements},
maxTotalMemoryUsage: maxTotalMemoryUsage,
},
}
}

Expand All @@ -286,7 +408,7 @@ func newUtxoCache(db database.DB, maxTotalMemoryUsage uint64) *utxoCache {
func (s *utxoCache) totalMemoryUsage() uint64 {
// Total memory is the map size + the size that the utxo entries are
// taking up.
size := uint64(CalculateRoughMapSize(s.maxElements, bucketSize))
size := uint64(s.cachedEntries.size())
size += s.totalEntryMemory

return size
Expand Down Expand Up @@ -334,8 +456,8 @@ func (s *utxoCache) fetchAndCacheEntries(ops []wire.OutPoint) ([]*UtxoEntry, err
// as a miss; this prevents future lookups to perform the same database
// fetch.
for i, entry := range entries {
s.cachedEntries[ops[i]] = entry
s.totalEntryMemory += entry.memoryUsage()
s.cachedEntries.put(ops[i], entry, s.totalEntryMemory)
}

return entries, nil
Expand All @@ -350,7 +472,8 @@ func (s *utxoCache) fetchAndCacheEntries(ops []wire.OutPoint) ([]*UtxoEntry, err
func (s *utxoCache) getEntry(outpoint wire.OutPoint) (*UtxoEntry, error) {
// First, we'll check out in-memory cache to see if we already have the
// entry.
if entry, found := s.cachedEntries[outpoint]; found {
entry, found := s.cachedEntries.get(outpoint)
if found {
return entry, nil
}

Expand All @@ -374,7 +497,7 @@ func (s *utxoCache) getEntries(outpoints []wire.OutPoint) ([]*UtxoEntry, error)
missingOps := make([]wire.OutPoint, 0, len(outpoints))
missingOpsIdx := make([]int, 0, len(outpoints))
for i, op := range outpoints {
if entry, ok := s.cachedEntries[op]; ok {
if entry, ok := s.cachedEntries.get(op); ok {
entries[i] = entry
continue
}
Expand Down Expand Up @@ -437,8 +560,8 @@ func (s *utxoCache) getEntryByHash(hash *chainhash.Hash) (*UtxoEntry, error) {
prevOut := wire.OutPoint{Hash: *hash}
for idx := uint32(0); idx < MaxOutputsPerBlock; idx++ {
prevOut.Index = idx
if entry, _ := s.cachedEntries[prevOut]; entry != nil {
return entry.Clone(), nil
if entry, _ := s.cachedEntries.get(prevOut); entry != nil {
return entry, nil
}
}

Expand Down Expand Up @@ -472,7 +595,7 @@ func (s *utxoCache) FetchEntryByHash(hash *chainhash.Hash) (*UtxoEntry, error) {
// This method is part of the utxoView interface.
// This method should be called with the state lock held.
func (s *utxoCache) spendEntry(outpoint wire.OutPoint, addIfNil *UtxoEntry) error {
entry := s.cachedEntries[outpoint]
entry, _ := s.cachedEntries.get(outpoint)

// If we don't have an entry in cache and an entry was provided, we add it.
if entry == nil && addIfNil != nil {
Expand All @@ -493,7 +616,7 @@ func (s *utxoCache) spendEntry(outpoint wire.OutPoint, addIfNil *UtxoEntry) erro
// We don't delete it from the map, but set the value to nil, so that
// later lookups for the entry know that the entry does not exist in the
// database.
s.cachedEntries[outpoint] = nil
s.cachedEntries.put(outpoint, nil, s.totalEntryMemory)
s.totalEntryMemory -= entry.memoryUsage()
return nil
}
Expand All @@ -516,7 +639,7 @@ func (s *utxoCache) addEntry(outpoint wire.OutPoint, entry *UtxoEntry, overwrite
return nil
}

cachedEntry := s.cachedEntries[outpoint]
cachedEntry, _ := s.cachedEntries.get(outpoint)

// In overwrite mode, simply add the entry without doing these checks.
if !overwrite {
Expand All @@ -539,7 +662,7 @@ func (s *utxoCache) addEntry(outpoint wire.OutPoint, entry *UtxoEntry, overwrite
}

entry.packedFlags |= tfModified
s.cachedEntries[outpoint] = entry
s.cachedEntries.put(outpoint, entry, s.totalEntryMemory)
s.totalEntryMemory -= cachedEntry.memoryUsage() // 0 for nil
s.totalEntryMemory += entry.memoryUsage()
return nil
Expand Down Expand Up @@ -606,7 +729,7 @@ func (s *utxoCache) Commit(view *UtxoViewpoint, node *blockNode) error {

// We can't use the view entry directly because it can be modified
// later on.
cachedEntry := s.cachedEntries[outpoint]
cachedEntry, _ := s.cachedEntries.get(outpoint)
if cachedEntry == nil {
cachedEntry = entry
}
Expand Down Expand Up @@ -679,39 +802,42 @@ func (s *utxoCache) flush(bestState *BestState) error {
// Store all entries in batches.
flushBatch := func(dbTx database.Tx) error {
nbBatchEntries := 0
for outpoint, entry := range s.cachedEntries {
// Nil entries or unmodified entries can just be pruned.
// They don't count for the batch size.
if entry == nil || !entry.isModified() {
s.totalEntryMemory -= entry.memoryUsage()
delete(s.cachedEntries, outpoint)
continue
}

if entry.IsSpent() {
if err := dbDeleteUtxoEntry(dbTx, outpoint); err != nil {
return err
for _, m := range s.cachedEntries.maps {
for outpoint, entry := range m {
// Nil entries or unmodified entries can just be pruned.
// They don't count for the batch size.
if entry == nil || !entry.isModified() {
s.totalEntryMemory -= entry.memoryUsage()
delete(m, outpoint)
continue
}
} else {
if err := dbPutUtxoEntry(dbTx, outpoint, entry); err != nil {
return err

if entry.IsSpent() {
if err := dbDeleteUtxoEntry(dbTx, outpoint); err != nil {
return err
}
} else {
if err := dbPutUtxoEntry(dbTx, outpoint, entry); err != nil {
return err
}
}
}
nbBatchEntries++
nbBatchEntries++

s.totalEntryMemory -= entry.memoryUsage()
delete(s.cachedEntries, outpoint)
s.totalEntryMemory -= entry.memoryUsage()
delete(m, outpoint)

// End this batch when the maximum number of entries per batch has
// been reached.
if nbBatchEntries >= utxoBatchSizeEntries {
break
// End this batch when the maximum number of entries per batch has
// been reached.
if nbBatchEntries >= utxoBatchSizeEntries {
break
}
}
}

return nil
}
for len(s.cachedEntries) > 0 {
log.Tracef("Flushing %d more entries...", len(s.cachedEntries))
for s.cachedEntries.length() > 0 {
log.Tracef("Flushing %d more entries...", s.cachedEntries.length())
err := s.db.Update(func(dbTx database.Tx) error {
return flushBatch(dbTx)
})
Expand All @@ -729,6 +855,8 @@ func (s *utxoCache) flush(bestState *BestState) error {
return err
}

s.cachedEntries.deleteMaps()

log.Debug("Done flushing UTXO cache to disk")
return nil
}
Expand Down
Loading

0 comments on commit b34a14b

Please sign in to comment.