Skip to content

Commit

Permalink
use vectorized io when writing dirty pages
Browse files Browse the repository at this point in the history
This makes use of the vectorized IO call pwritev to write subsequent
pages in up to 1024 batches of smaller buffer writes.

Signed-off-by: Thomas Jungblut <tjungblu@redhat.com>
  • Loading branch information
tjungblu committed Dec 1, 2022
1 parent eedea6c commit 530b1c5
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 33 deletions.
57 changes: 57 additions & 0 deletions bolt_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,63 @@ import (
"golang.org/x/sys/unix"
)

// pwritev writes the given pages using vectorized io. The pages must be sorted by their id indicating their sequence.
// The logic identifies runs of consecutive pages that are written with vectorized io using the pwritev syscall.
func pwritev(db *DB, pages []*page) error {
if len(pages) == 0 {
return nil
}

offsets, iovecs := pagesToIovec(db, pages)

var (
e syscall.Errno
)
for i := 0; i < len(offsets); i++ {
_, _, e = syscall.Syscall6(syscall.SYS_PWRITEV, db.file.Fd(), uintptr(unsafe.Pointer(&iovecs[i][0])),
uintptr(len(iovecs[i])), uintptr(offsets[i]), uintptr(offsets[i]>>0x8), 0)
if e != 0 {
return e
}
}

return nil
}

func pagesToIovec(db *DB, pages []*page) ([]uint64, [][]syscall.Iovec) {
var offsets []uint64
var iovecs [][]syscall.Iovec

// TODO: read from this from sysconf(_SC_IOV_MAX)?
// linux and darwin default is 1024
const maxVec = 1024

lastPid := pages[0].id - 1
begin := 0
var curVecs []syscall.Iovec
for i := 0; i < len(pages); i++ {
p := pages[i]
if p.id != (lastPid+1) || len(curVecs) >= maxVec {
offsets = append(offsets, uint64(pages[begin].id)*uint64(db.pageSize))
iovecs = append(iovecs, curVecs)

begin = i
curVecs = []syscall.Iovec{}
}
curVecs = append(curVecs, syscall.Iovec{
Base: (*byte)(unsafe.Pointer(p)),
Len: (uint64(p.overflow) + 1) * uint64(db.pageSize),
})
lastPid = p.id + pgid(p.overflow)
}

if len(curVecs) > 0 {
offsets = append(offsets, uint64(pages[begin].id)*uint64(db.pageSize))
iovecs = append(iovecs, curVecs)
}
return offsets, iovecs
}

// flock acquires an advisory lock on a file descriptor.
func flock(db *DB, exclusive bool, timeout time.Duration) error {
var t time.Time
Expand Down
35 changes: 35 additions & 0 deletions bolt_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,41 @@ import (
"golang.org/x/sys/windows"
)

// pwritev on Windows does not make use of vectorized io, it merely writes the bytes using the db ops.writeAt.
// The pages must be sorted by their id indicating their sequence.
func pwritev(db *DB, pages []*page) error {
for _, p := range pages {
rem := (uint64(p.overflow) + 1) * uint64(db.pageSize)
offset := int64(p.id) * int64(db.pageSize)
var written uintptr

// Write out page in "max allocation" sized chunks.
for {
sz := rem
if sz > maxAllocSize-1 {
sz = maxAllocSize - 1
}
buf := unsafeByteSlice(unsafe.Pointer(p), written, 0, int(sz))

if _, err := db.ops.writeAt(buf, offset); err != nil {
return err
}

// Exit inner for loop if we've written all the chunks.
rem -= sz
if rem == 0 {
break
}

// Otherwise move offset forward and move pointer to next chunk.
offset += int64(sz)
written += uintptr(sz)
}
}

return nil
}

// fdatasync flushes written data to a file descriptor.
func fdatasync(db *DB) error {
return db.file.Sync()
Expand Down
44 changes: 11 additions & 33 deletions tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,48 +511,26 @@ func (tx *Tx) allocate(count int) (*page, error) {

// write writes any dirty pages to disk.
func (tx *Tx) write() error {
if len(tx.pages) == 0 {
return nil
}

// Sort pages by id.
pages := make(pages, 0, len(tx.pages))
for _, p := range tx.pages {
pages = append(pages, p)
}
// Clear out page cache early.
tx.pages = make(map[pgid]*page)
sort.Sort(pages)

// Write pages to disk in order.
for _, p := range pages {
rem := (uint64(p.overflow) + 1) * uint64(tx.db.pageSize)
offset := int64(p.id) * int64(tx.db.pageSize)
var written uintptr

// Write out page in "max allocation" sized chunks.
for {
sz := rem
if sz > maxAllocSize-1 {
sz = maxAllocSize - 1
}
buf := unsafeByteSlice(unsafe.Pointer(p), written, 0, int(sz))

if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
return err
}

// Update statistics.
tx.stats.Write++

// Exit inner for loop if we've written all the chunks.
rem -= sz
if rem == 0 {
break
}

// Otherwise move offset forward and move pointer to next chunk.
offset += int64(sz)
written += uintptr(sz)
}
// Clear out page cache early.
tx.pages = make(map[pgid]*page)
// Write pages to disk.
if err := pwritev(tx.db, pages); err != nil {
return err
}

tx.stats.Write++

// Ignore file sync if flag is set on DB.
if !tx.db.NoSync || IgnoreNoSync {
if err := fdatasync(tx.db); err != nil {
Expand Down

0 comments on commit 530b1c5

Please sign in to comment.