Skip to content

Commit

Permalink
use vectorized io when writing dirty pages
Browse files Browse the repository at this point in the history
This makes use of the vectorized IO call pwritev to write subsequent
pages in up to 1024 batches of smaller buffer writes.

Signed-off-by: Thomas Jungblut <tjungblu@redhat.com>
  • Loading branch information
tjungblu committed Mar 17, 2023
1 parent 3f572b4 commit cd85a1e
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 31 deletions.
57 changes: 57 additions & 0 deletions bolt_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,63 @@ import (
"go.etcd.io/bbolt/internal/common"
)

// pwritev writes the given pages using vectorized io. The pages must be sorted by their id indicating their sequence.
// The logic identifies runs of consecutive pages that are written with vectorized io using the pwritev syscall.
func pwritev(db *DB, pages []*common.Page) error {
if len(pages) == 0 {
return nil
}

offsets, iovecs := pagesToIovec(db, pages)

var (
e syscall.Errno
)
for i := 0; i < len(offsets); i++ {
_, _, e = syscall.Syscall6(syscall.SYS_PWRITEV, db.file.Fd(), uintptr(unsafe.Pointer(&iovecs[i][0])),
uintptr(len(iovecs[i])), uintptr(offsets[i]), uintptr(offsets[i]>>0x8), 0)
if e != 0 {
return e
}
}

return nil
}

func pagesToIovec(db *DB, pages []*common.Page) ([]uint64, [][]syscall.Iovec) {
var offsets []uint64
var iovecs [][]syscall.Iovec

// TODO: read from this from sysconf(_SC_IOV_MAX)?
// linux and darwin default is 1024
const maxVec = 1024

lastPid := pages[0].Id() - 1
begin := 0
var curVecs []syscall.Iovec
for i := 0; i < len(pages); i++ {
p := pages[i]
if p.Id() != (lastPid+1) || len(curVecs) >= maxVec {
offsets = append(offsets, uint64(pages[begin].Id())*uint64(db.pageSize))
iovecs = append(iovecs, curVecs)

begin = i
curVecs = []syscall.Iovec{}
}
curVecs = append(curVecs, syscall.Iovec{
Base: (*byte)(unsafe.Pointer(p)),
Len: (uint64(p.Overflow()) + 1) * uint64(db.pageSize),
})
lastPid = p.Id() + common.Pgid(p.Overflow())
}

if len(curVecs) > 0 {
offsets = append(offsets, uint64(pages[begin].Id())*uint64(db.pageSize))
iovecs = append(iovecs, curVecs)
}
return offsets, iovecs
}

// flock acquires an advisory lock on a file descriptor.
func flock(db *DB, exclusive bool, timeout time.Duration) error {
var t time.Time
Expand Down
35 changes: 35 additions & 0 deletions bolt_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,41 @@ import (
"go.etcd.io/bbolt/internal/common"
)

// pwritev on Windows does not make use of vectorized io, it merely writes the bytes using the db ops.writeAt.
// The pages must be sorted by their id indicating their sequence.
func pwritev(db *DB, pages []*page) error {
for _, p := range pages {
rem := (uint64(p.overflow) + 1) * uint64(db.pageSize)
offset := int64(p.id) * int64(db.pageSize)
var written uintptr

// Write out page in "max allocation" sized chunks.
for {
sz := rem
if sz > maxAllocSize-1 {
sz = maxAllocSize - 1
}
buf := unsafeByteSlice(unsafe.Pointer(p), written, 0, int(sz))

if _, err := db.ops.writeAt(buf, offset); err != nil {
return err
}

// Exit inner for loop if we've written all the chunks.
rem -= sz
if rem == 0 {
break
}

// Otherwise move offset forward and move pointer to next chunk.
offset += int64(sz)
written += uintptr(sz)
}
}

return nil
}

// fdatasync flushes written data to a file descriptor.
func fdatasync(db *DB) error {
return db.file.Sync()
Expand Down
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
go.etcd.io/gofail v0.1.0 h1:XItAMIhOojXFQMgrxjnd2EIIHun/d5qL0Pf7FzVTkFg=
Expand Down
38 changes: 8 additions & 30 deletions tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,10 @@ func (tx *Tx) allocate(count int) (*common.Page, error) {

// write writes any dirty pages to disk.
func (tx *Tx) write() error {
if len(tx.pages) == 0 {
return nil
}

// Sort pages by id.
pages := make(common.Pages, 0, len(tx.pages))
for _, p := range tx.pages {
Expand All @@ -435,38 +439,12 @@ func (tx *Tx) write() error {
sort.Sort(pages)

// Write pages to disk in order.
for _, p := range pages {
rem := (uint64(p.Overflow()) + 1) * uint64(tx.db.pageSize)
offset := int64(p.Id()) * int64(tx.db.pageSize)
var written uintptr

// Write out page in "max allocation" sized chunks.
for {
sz := rem
if sz > maxAllocSize-1 {
sz = maxAllocSize - 1
}
buf := common.UnsafeByteSlice(unsafe.Pointer(p), written, 0, int(sz))

if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
return err
}

// Update statistics.
tx.stats.IncWrite(1)

// Exit inner for loop if we've written all the chunks.
rem -= sz
if rem == 0 {
break
}

// Otherwise move offset forward and move pointer to next chunk.
offset += int64(sz)
written += uintptr(sz)
}
if err := pwritev(tx.db, pages); err != nil {
return err
}

tx.stats.IncWrite(1)

// Ignore file sync if flag is set on DB.
if !tx.db.NoSync || common.IgnoreNoSync {
if err := fdatasync(tx.db); err != nil {
Expand Down

0 comments on commit cd85a1e

Please sign in to comment.