From ebca452da79ad504fe61abb944eddfc140f9301b Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Fri, 16 Dec 2022 15:57:09 +0100 Subject: [PATCH] Add internal iterator to Bucket that goes over buckets. So far the code was frequently traversing all the keys (ignoring flag whether key is a bucket) and trying to open each of the keys as bucket (seeking the same entry from the scratch). In this proposal, we iterate only through bucket keys. Signed-off-by: Piotr Tabor --- bucket.go | 27 +++++--- bucket_test.go | 167 ++++++++++++++++++++++++++++++++++++------------- cursor.go | 19 ++++-- go.mod | 2 +- go.sum | 4 +- tx.go | 2 +- 6 files changed, 159 insertions(+), 62 deletions(-) diff --git a/bucket.go b/bucket.go index 477662943..5fe64909a 100644 --- a/bucket.go +++ b/bucket.go @@ -229,11 +229,9 @@ func (b *Bucket) DeleteBucket(key []byte) error { // Recursively delete all child buckets. child := b.Bucket(key) - err := child.ForEach(func(k, v []byte) error { - if _, _, childFlags := child.Cursor().seek(k); (childFlags & bucketLeafFlag) != 0 { - if err := child.DeleteBucket(k); err != nil { - return fmt.Errorf("delete bucket: %s", err) - } + err := child.ForEachBucket(func(k []byte) error { + if err := child.DeleteBucket(k); err != nil { + return fmt.Errorf("delete bucket: %s", err) } return nil }) @@ -394,7 +392,22 @@ func (b *Bucket) ForEach(fn func(k, v []byte) error) error { return nil } -// Stat returns stats on a bucket. +func (b *Bucket) ForEachBucket(fn func(k []byte) error) error { + if b.tx.db == nil { + return ErrTxClosed + } + c := b.Cursor() + for k, _, flags := c.first(); k != nil; k, _, flags = c.next() { + if flags&bucketLeafFlag != 0 { + if err := fn(k); err != nil { + return err + } + } + } + return nil +} + +// Stats returns stats on a bucket. func (b *Bucket) Stats() BucketStats { var s, subStats BucketStats pageSize := b.tx.db.pageSize @@ -461,7 +474,7 @@ func (b *Bucket) Stats() BucketStats { // Keep track of maximum page depth. if depth+1 > s.Depth { - s.Depth = (depth + 1) + s.Depth = depth + 1 } }) diff --git a/bucket_test.go b/bucket_test.go index 2c53dba93..66f9a1fa0 100644 --- a/bucket_test.go +++ b/bucket_test.go @@ -5,6 +5,8 @@ import ( "encoding/binary" "errors" "fmt" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "log" "math/rand" "os" @@ -13,7 +15,6 @@ import ( "testing" "testing/quick" - "github.com/stretchr/testify/assert" bolt "go.etcd.io/bbolt" ) @@ -1005,57 +1006,133 @@ func TestBucket_ForEach(t *testing.T) { db := MustOpenDB() defer db.MustClose() - if err := db.Update(func(tx *bolt.Tx) error { + type kv struct { + k []byte + v []byte + } + + expectedItems := []kv{ + {k: []byte("bar"), v: []byte("0002")}, + {k: []byte("baz"), v: []byte("0001")}, + {k: []byte("csubbucket"), v: nil}, + {k: []byte("foo"), v: []byte("0000")}, + } + + verifyReads := func(b *bolt.Bucket) { + var items []kv + err := b.ForEach(func(k, v []byte) error { + items = append(items, kv{k: k, v: v}) + return nil + }) + assert.NoErrorf(t, err, "b.ForEach failed") + assert.Equal(t, expectedItems, items, "what we iterated (ForEach) is not what we put") + } + + err := db.Update(func(tx *bolt.Tx) error { b, err := tx.CreateBucket([]byte("widgets")) - if err != nil { - t.Fatal(err) - } - if err := b.Put([]byte("foo"), []byte("0000")); err != nil { - t.Fatal(err) - } - if err := b.Put([]byte("baz"), []byte("0001")); err != nil { - t.Fatal(err) - } - if err := b.Put([]byte("bar"), []byte("0002")); err != nil { - t.Fatal(err) - } + require.NoError(t, err, "bucket creation failed") - var index int - if err := b.ForEach(func(k, v []byte) error { - switch index { - case 0: - if !bytes.Equal(k, []byte("bar")) { - t.Fatalf("unexpected key: %v", k) - } else if !bytes.Equal(v, []byte("0002")) { - t.Fatalf("unexpected value: %v", v) - } - case 1: - if !bytes.Equal(k, []byte("baz")) { - t.Fatalf("unexpected key: %v", k) - } else if !bytes.Equal(v, []byte("0001")) { - t.Fatalf("unexpected value: %v", v) - } - case 2: - if !bytes.Equal(k, []byte("foo")) { - t.Fatalf("unexpected key: %v", k) - } else if !bytes.Equal(v, []byte("0000")) { - t.Fatalf("unexpected value: %v", v) - } - } - index++ + require.NoErrorf(t, b.Put([]byte("foo"), []byte("0000")), "put 'foo' failed") + require.NoErrorf(t, b.Put([]byte("baz"), []byte("0001")), "put 'baz' failed") + require.NoErrorf(t, b.Put([]byte("bar"), []byte("0002")), "put 'bar' failed") + _, err = b.CreateBucket([]byte("csubbucket")) + require.NoErrorf(t, err, "creation of subbucket failed") + + verifyReads(b) + + return nil + }) + require.NoErrorf(t, err, "db.Update failed") + err = db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("widgets")) + require.NotNil(t, b, "bucket opening failed") + verifyReads(b) + return nil + }) + assert.NoErrorf(t, err, "db.View failed") +} + +func TestBucket_ForEachBucket(t *testing.T) { + db := MustOpenDB() + defer db.MustClose() + + expectedItems := [][]byte{ + []byte("csubbucket"), + []byte("zsubbucket"), + } + + verifyReads := func(b *bolt.Bucket) { + var items [][]byte + err := b.ForEachBucket(func(k []byte) error { + items = append(items, k) return nil - }); err != nil { - t.Fatal(err) - } + }) + assert.NoErrorf(t, err, "b.ForEach failed") + assert.Equal(t, expectedItems, items, "what we iterated (ForEach) is not what we put") + } - if index != 3 { - t.Fatalf("unexpected index: %d", index) - } + err := db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucket([]byte("widgets")) + require.NoError(t, err, "bucket creation failed") + + require.NoErrorf(t, b.Put([]byte("foo"), []byte("0000")), "put 'foo' failed") + _, err = b.CreateBucket([]byte("zsubbucket")) + require.NoErrorf(t, err, "creation of subbucket failed") + require.NoErrorf(t, b.Put([]byte("baz"), []byte("0001")), "put 'baz' failed") + require.NoErrorf(t, b.Put([]byte("bar"), []byte("0002")), "put 'bar' failed") + _, err = b.CreateBucket([]byte("csubbucket")) + require.NoErrorf(t, err, "creation of subbucket failed") + + verifyReads(b) return nil - }); err != nil { - t.Fatal(err) + }) + assert.NoErrorf(t, err, "db.Update failed") + err = db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("widgets")) + require.NotNil(t, b, "bucket opening failed") + verifyReads(b) + return nil + }) + assert.NoErrorf(t, err, "db.View failed") +} + +func TestBucket_ForEachBucket_NoBuckets(t *testing.T) { + db := MustOpenDB() + defer db.MustClose() + + verifyReads := func(b *bolt.Bucket) { + var items [][]byte + err := b.ForEachBucket(func(k []byte) error { + items = append(items, k) + return nil + }) + assert.NoErrorf(t, err, "b.ForEach failed") + assert.Emptyf(t, items, "what we iterated (ForEach) is not what we put") } + + err := db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucket([]byte("widgets")) + require.NoError(t, err, "bucket creation failed") + + require.NoErrorf(t, b.Put([]byte("foo"), []byte("0000")), "put 'foo' failed") + require.NoErrorf(t, err, "creation of subbucket failed") + require.NoErrorf(t, b.Put([]byte("baz"), []byte("0001")), "put 'baz' failed") + require.NoErrorf(t, err, "creation of subbucket failed") + + verifyReads(b) + + return nil + }) + require.NoErrorf(t, err, "db.Update failed") + + err = db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("widgets")) + require.NotNil(t, b, "bucket opening failed") + verifyReads(b) + return nil + }) + assert.NoErrorf(t, err, "db.View failed") } // Ensure a database can stop iteration early. diff --git a/cursor.go b/cursor.go index f22aa6457..0bddb54f5 100644 --- a/cursor.go +++ b/cursor.go @@ -30,10 +30,18 @@ func (c *Cursor) Bucket() *Bucket { // The returned key and value are only valid for the life of the transaction. func (c *Cursor) First() (key []byte, value []byte) { _assert(c.bucket.tx.db != nil, "tx closed") + k, v, flags := c.first() + if (flags & uint32(bucketLeafFlag)) != 0 { + return k, nil + } + return k, v +} + +func (c *Cursor) first() (key []byte, value []byte, flags uint32) { c.stack = c.stack[:0] p, n := c.bucket.pageNode(c.bucket.root) c.stack = append(c.stack, elemRef{page: p, node: n, index: 0}) - c.first() + c.goToFirstElementOnTheStack() // If we land on an empty page then move to the next value. // https://github.com/boltdb/bolt/issues/450 @@ -43,10 +51,9 @@ func (c *Cursor) First() (key []byte, value []byte) { k, v, flags := c.keyValue() if (flags & uint32(bucketLeafFlag)) != 0 { - return k, nil + return k, nil, flags } - return k, v - + return k, v, flags } // Last moves the cursor to the last item in the bucket and returns its key and value. @@ -155,7 +162,7 @@ func (c *Cursor) seek(seek []byte) (key []byte, value []byte, flags uint32) { } // first moves the cursor to the first leaf element under the last page in the stack. -func (c *Cursor) first() { +func (c *Cursor) goToFirstElementOnTheStack() { for { // Exit when we hit a leaf page. var ref = &c.stack[len(c.stack)-1] @@ -223,7 +230,7 @@ func (c *Cursor) next() (key []byte, value []byte, flags uint32) { // Otherwise start from where we left off in the stack and find the // first element of the first leaf page. c.stack = c.stack[:i+1] - c.first() + c.goToFirstElementOnTheStack() // If this is an empty page then restart and move back up the stack. // https://github.com/boltdb/bolt/issues/450 diff --git a/go.mod b/go.mod index 0bdc35fcc..a58befa15 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.17 require ( github.com/stretchr/testify v1.8.1 - golang.org/x/sys v0.2.0 + golang.org/x/sys v0.3.0 ) require ( diff --git a/go.sum b/go.sum index 587354659..3d9dedd3e 100644 --- a/go.sum +++ b/go.sum @@ -10,8 +10,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -golang.org/x/sys v0.2.0 h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A= -golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ= +golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/tx.go b/tx.go index 70667abc2..2cc180aa8 100644 --- a/tx.go +++ b/tx.go @@ -484,7 +484,7 @@ func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, freed map[pgid]bo }) // Check each bucket within this bucket. - _ = b.ForEach(func(k, v []byte) error { + _ = b.ForEachBucket(func(k []byte) error { if child := b.Bucket(k); child != nil { tx.checkBucket(child, reachable, freed, ch) }