Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: close the pruning process properly #967

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

- [#965](https://github.com/cosmos/iavl/pull/965) Use expected interface for expected IAVL `Logger`.
- [#967](https://github.com/cosmos/iavl/pull/967) Close the pruning process when the nodeDB is closed.

## v1.2.0 May 13, 2024

Expand Down
53 changes: 36 additions & 17 deletions nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package iavl

import (
"bytes"
"context"
"crypto/sha256"
"errors"
"fmt"
Expand Down Expand Up @@ -71,9 +72,12 @@ var (
var errInvalidFastStorageVersion = fmt.Errorf("fast storage version must be in the format <storage version>%s<latest fast cache version>", fastStorageVersionDelimiter)

type nodeDB struct {
ctx context.Context
cancel context.CancelFunc
logger Logger

mtx sync.Mutex // Read/write lock.
done chan struct{} // Channel to signal that the pruning process is done.
db dbm.DB // Persistent node storage.
batch dbm.Batch // Batched writing buffer.
opts Options // Options to customize for pruning/writing
Expand All @@ -91,12 +95,15 @@ type nodeDB struct {

func newNodeDB(db dbm.DB, cacheSize int, opts Options, lg Logger) *nodeDB {
storeVersion, err := db.Get(metadataKeyFormat.Key([]byte(storageVersionKey)))

if err != nil || storeVersion == nil {
storeVersion = []byte(defaultStorageVersionValue)
}

ctx, cancel := context.WithCancel(context.Background())

ndb := &nodeDB{
ctx: ctx,
cancel: cancel,
logger: lg,
db: db,
batch: NewBatchWithFlusher(db, opts.FlushThreshold),
Expand All @@ -113,6 +120,7 @@ func newNodeDB(db dbm.DB, cacheSize int, opts Options, lg Logger) *nodeDB {
}

if opts.AsyncPruning {
ndb.done = make(chan struct{})
go ndb.startPruning()
}

Expand Down Expand Up @@ -578,26 +586,32 @@ func (ndb *nodeDB) DeleteVersionsFrom(fromVersion int64) error {
// startPruning starts the pruning process.
func (ndb *nodeDB) startPruning() {
for {
ndb.mtx.Lock()
toVersion := ndb.pruneVersion
ndb.mtx.Unlock()
select {
case <-ndb.ctx.Done(): // ndb is closed
ndb.done <- struct{}{}
return
default:
ndb.mtx.Lock()
toVersion := ndb.pruneVersion
ndb.mtx.Unlock()

if toVersion == 0 {
time.Sleep(100 * time.Millisecond)
continue
}
if toVersion == 0 {
time.Sleep(100 * time.Millisecond)
continue
}

if err := ndb.deleteVersionsTo(toVersion); err != nil {
ndb.logger.Error("Error while pruning", "err", err)
time.Sleep(1 * time.Second)
continue
}
if err := ndb.deleteVersionsTo(toVersion); err != nil {
ndb.logger.Error("Error while pruning", "err", err)
time.Sleep(1 * time.Second)
continue
}

ndb.mtx.Lock()
if ndb.pruneVersion <= toVersion {
ndb.pruneVersion = 0
ndb.mtx.Lock()
if ndb.pruneVersion <= toVersion {
ndb.pruneVersion = 0
}
ndb.mtx.Unlock()
}
ndb.mtx.Unlock()
}
}

Expand Down Expand Up @@ -1095,6 +1109,11 @@ func (ndb *nodeDB) Close() error {
ndb.mtx.Lock()
defer ndb.mtx.Unlock()

ndb.cancel()
if ndb.opts.AsyncPruning {
<-ndb.done // wait for the pruning process to finish
}

if ndb.batch != nil {
if err := ndb.batch.Close(); err != nil {
return err
Expand Down
9 changes: 9 additions & 0 deletions nodedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,3 +436,12 @@ func TestDeleteVersionsFromNoDeadlock(t *testing.T) {
require.Error(t, err, "")
require.Contains(t, err.Error(), fmt.Sprintf("unable to delete version %v with 2 active readers", targetVersion+2))
}

func TestCloseNodeDB(t *testing.T) {
db := dbm.NewMemDB()
defer db.Close()
opts := DefaultOptions()
opts.AsyncPruning = true
ndb := newNodeDB(db, 0, opts, NewNopLogger())
require.NoError(t, ndb.Close())
}
Loading