From eb6f2ce0b6cecd5786a0eb93cdec2c2d19e4b4ce Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Fri, 16 Dec 2022 15:57:09 +0100 Subject: [PATCH] Add internal iterator to Bucket that goes over buckets. So far the code was frequently traversing all the keys (ignoring flag whether key is a bucket) and trying to open each of the keys as bucket (seeking the same entry from the scratch). In this proposal, we iterate only through bucket keys. Signed-off-by: Piotr Tabor --- bucket.go | 27 +++++--- bucket_test.go | 164 ++++++++++++++++++++++++++++++++++++------------- cursor.go | 19 ++++-- go.mod | 8 +++ go.sum | 17 +++++ tx.go | 2 +- 6 files changed, 179 insertions(+), 58 deletions(-) diff --git a/bucket.go b/bucket.go index 9dcb938ad..5fb3a3dad 100644 --- a/bucket.go +++ b/bucket.go @@ -229,11 +229,9 @@ func (b *Bucket) DeleteBucket(key []byte) error { // Recursively delete all child buckets. child := b.Bucket(key) - err := child.ForEach(func(k, v []byte) error { - if _, _, childFlags := child.Cursor().seek(k); (childFlags & bucketLeafFlag) != 0 { - if err := child.DeleteBucket(k); err != nil { - return fmt.Errorf("delete bucket: %s", err) - } + err := child.ForEachBucket(func(k []byte) error { + if err := child.DeleteBucket(k); err != nil { + return fmt.Errorf("delete bucket: %s", err) } return nil }) @@ -394,7 +392,22 @@ func (b *Bucket) ForEach(fn func(k, v []byte) error) error { return nil } -// Stat returns stats on a bucket. +func (b *Bucket) ForEachBucket(fn func(k []byte) error) error { + if b.tx.db == nil { + return ErrTxClosed + } + c := b.Cursor() + for k, _, flags := c.first(); k != nil; k, _, flags = c.next() { + if flags&bucketLeafFlag != 0 { + if err := fn(k); err != nil { + return err + } + } + } + return nil +} + +// Stats returns stats on a bucket. func (b *Bucket) Stats() BucketStats { var s, subStats BucketStats pageSize := b.tx.db.pageSize @@ -461,7 +474,7 @@ func (b *Bucket) Stats() BucketStats { // Keep track of maximum page depth. if depth+1 > s.Depth { - s.Depth = (depth + 1) + s.Depth = depth + 1 } }) diff --git a/bucket_test.go b/bucket_test.go index 2ac926359..abcf04bbf 100644 --- a/bucket_test.go +++ b/bucket_test.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "errors" "fmt" + "github.com/stretchr/testify/assert" "log" "math/rand" "os" @@ -1003,57 +1004,132 @@ func TestBucket_ForEach(t *testing.T) { db := MustOpenDB() defer db.MustClose() - if err := db.Update(func(tx *bolt.Tx) error { + type kv struct { + k []byte + v []byte + } + + expectedItems := []kv{ + {k: []byte("bar"), v: []byte("0002")}, + {k: []byte("baz"), v: []byte("0001")}, + {k: []byte("csubbucket"), v: nil}, + {k: []byte("foo"), v: []byte("0000")}, + } + + verifyReads := func(b *bolt.Bucket) { + var items []kv + err := b.ForEach(func(k, v []byte) error { + items = append(items, kv{k: k, v: v}) + return nil + }) + assert.NoErrorf(t, err, "b.ForEach failed") + assert.Equal(t, expectedItems, items, "what we iterated (ForEach) is not what we put") + } + + err := db.Update(func(tx *bolt.Tx) error { b, err := tx.CreateBucket([]byte("widgets")) - if err != nil { - t.Fatal(err) - } - if err := b.Put([]byte("foo"), []byte("0000")); err != nil { - t.Fatal(err) - } - if err := b.Put([]byte("baz"), []byte("0001")); err != nil { - t.Fatal(err) - } - if err := b.Put([]byte("bar"), []byte("0002")); err != nil { - t.Fatal(err) - } + assert.NoError(t, err, "bucket creation failed") - var index int - if err := b.ForEach(func(k, v []byte) error { - switch index { - case 0: - if !bytes.Equal(k, []byte("bar")) { - t.Fatalf("unexpected key: %v", k) - } else if !bytes.Equal(v, []byte("0002")) { - t.Fatalf("unexpected value: %v", v) - } - case 1: - if !bytes.Equal(k, []byte("baz")) { - t.Fatalf("unexpected key: %v", k) - } else if !bytes.Equal(v, []byte("0001")) { - t.Fatalf("unexpected value: %v", v) - } - case 2: - if !bytes.Equal(k, []byte("foo")) { - t.Fatalf("unexpected key: %v", k) - } else if !bytes.Equal(v, []byte("0000")) { - t.Fatalf("unexpected value: %v", v) - } - } - index++ + assert.NoErrorf(t, b.Put([]byte("foo"), []byte("0000")), "put 'foo' failed") + assert.NoErrorf(t, b.Put([]byte("baz"), []byte("0001")), "put 'baz' failed") + assert.NoErrorf(t, b.Put([]byte("bar"), []byte("0002")), "put 'bar' failed") + _, err = b.CreateBucket([]byte("csubbucket")) + assert.NoErrorf(t, err, "creation of subbucket failed") + + verifyReads(b) + + return nil + }) + db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("widgets")) + assert.NotNil(t, b, "bucket opening failed") + verifyReads(b) + return nil + }) + + assert.NoErrorf(t, err, "db.Update failed") +} + +func TestBucket_ForEachBucket(t *testing.T) { + db := MustOpenDB() + defer db.MustClose() + + expectedItems := [][]byte{ + []byte("csubbucket"), + []byte("zsubbucket"), + } + + verifyReads := func(b *bolt.Bucket) { + var items [][]byte + err := b.ForEachBucket(func(k []byte) error { + items = append(items, k) return nil - }); err != nil { - t.Fatal(err) - } + }) + assert.NoErrorf(t, err, "b.ForEach failed") + assert.Equal(t, expectedItems, items, "what we iterated (ForEach) is not what we put") + } - if index != 3 { - t.Fatalf("unexpected index: %d", index) - } + err := db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucket([]byte("widgets")) + assert.NoError(t, err, "bucket creation failed") + + assert.NoErrorf(t, b.Put([]byte("foo"), []byte("0000")), "put 'foo' failed") + _, err = b.CreateBucket([]byte("zsubbucket")) + assert.NoErrorf(t, err, "creation of subbucket failed") + assert.NoErrorf(t, b.Put([]byte("baz"), []byte("0001")), "put 'baz' failed") + assert.NoErrorf(t, b.Put([]byte("bar"), []byte("0002")), "put 'bar' failed") + _, err = b.CreateBucket([]byte("csubbucket")) + assert.NoErrorf(t, err, "creation of subbucket failed") + + verifyReads(b) return nil - }); err != nil { - t.Fatal(err) + }) + db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("widgets")) + assert.NotNil(t, b, "bucket opening failed") + verifyReads(b) + return nil + }) + + assert.NoErrorf(t, err, "db.Update failed") +} + +func TestBucket_ForEachBucket_NoBuckets(t *testing.T) { + db := MustOpenDB() + defer db.MustClose() + + verifyReads := func(b *bolt.Bucket) { + var items [][]byte + err := b.ForEachBucket(func(k []byte) error { + items = append(items, k) + return nil + }) + assert.NoErrorf(t, err, "b.ForEach failed") + assert.Emptyf(t, items, "what we iterated (ForEach) is not what we put") } + + err := db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucket([]byte("widgets")) + assert.NoError(t, err, "bucket creation failed") + + assert.NoErrorf(t, b.Put([]byte("foo"), []byte("0000")), "put 'foo' failed") + assert.NoErrorf(t, err, "creation of subbucket failed") + assert.NoErrorf(t, b.Put([]byte("baz"), []byte("0001")), "put 'baz' failed") + assert.NoErrorf(t, err, "creation of subbucket failed") + + verifyReads(b) + + return nil + }) + db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("widgets")) + assert.NotNil(t, b, "bucket opening failed") + verifyReads(b) + return nil + }) + + assert.NoErrorf(t, err, "db.Update failed") } // Ensure a database can stop iteration early. diff --git a/cursor.go b/cursor.go index ba6a8345d..448afda44 100644 --- a/cursor.go +++ b/cursor.go @@ -29,11 +29,19 @@ func (c *Cursor) Bucket() *Bucket { // If the bucket is empty then a nil key and value are returned. // The returned key and value are only valid for the life of the transaction. func (c *Cursor) First() (key []byte, value []byte) { + k, v, flags := c.first() + if (flags & uint32(bucketLeafFlag)) != 0 { + return k, nil + } + return k, v +} + +func (c *Cursor) first() (key []byte, value []byte, flags uint32) { _assert(c.bucket.tx.db != nil, "tx closed") c.stack = c.stack[:0] p, n := c.bucket.pageNode(c.bucket.root) c.stack = append(c.stack, elemRef{page: p, node: n, index: 0}) - c.first() + c.goToFirstElementOnTheStack() // If we land on an empty page then move to the next value. // https://github.com/boltdb/bolt/issues/450 @@ -43,10 +51,9 @@ func (c *Cursor) First() (key []byte, value []byte) { k, v, flags := c.keyValue() if (flags & uint32(bucketLeafFlag)) != 0 { - return k, nil + return k, nil, flags } - return k, v - + return k, v, flags } // Last moves the cursor to the last item in the bucket and returns its key and value. @@ -155,7 +162,7 @@ func (c *Cursor) seek(seek []byte) (key []byte, value []byte, flags uint32) { } // first moves the cursor to the first leaf element under the last page in the stack. -func (c *Cursor) first() { +func (c *Cursor) goToFirstElementOnTheStack() { for { // Exit when we hit a leaf page. var ref = &c.stack[len(c.stack)-1] @@ -223,7 +230,7 @@ func (c *Cursor) next() (key []byte, value []byte, flags uint32) { // Otherwise start from where we left off in the stack and find the // first element of the first leaf page. c.stack = c.stack[:i+1] - c.first() + c.goToFirstElementOnTheStack() // If this is an empty page then restart and move back up the stack. // https://github.com/boltdb/bolt/issues/450 diff --git a/go.mod b/go.mod index cbafe64fd..6f7d088c2 100644 --- a/go.mod +++ b/go.mod @@ -3,3 +3,11 @@ module go.etcd.io/bbolt go 1.17 require golang.org/x/sys v0.2.0 + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/objx v0.5.0 // indirect + github.com/stretchr/testify v1.8.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum index beac707cf..30ccca85c 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,19 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= golang.org/x/sys v0.2.0 h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/tx.go b/tx.go index 451cc2c98..15f913ab7 100644 --- a/tx.go +++ b/tx.go @@ -484,7 +484,7 @@ func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, freed map[pgid]bo }) // Check each bucket within this bucket. - _ = b.ForEach(func(k, v []byte) error { + _ = b.ForEachBucket(func(k []byte) error { if child := b.Bucket(k); child != nil { tx.checkBucket(child, reachable, freed, ch) }