Skip to content

Commit

Permalink
storage/engine: use SyncWAL instead of sync on commit
Browse files Browse the repository at this point in the history
The previous implementation of batching for synced commits had the
unfortunate property that commits that did not require syncing were
required to wait for the WAL to be synced. When RocksDB writes a batch
with sync==true it internally does:

  1. Add batch to WAL
  2. Sync WAL
  3. Add entries to mem table

Switch to using SyncWAL to explicitly sync the WAL after writing a
batch. This is slightly different semantics from the above:

  1. Add batch to WAL
  2. Add entries to mem table
  3. Sync WAL

The advantage of this new approach is that non-synced batches do not
have to wait for the WAL to sync. Prior to this change, it was observed
that essentially every batch was waiting for a WAL sync. Approximately
half of all batch commits are performed with sync==true (half of all
batch commits are for Raft log entries or Raft state). Forcing the
non-synced commits to wait for the WAL added significant time.

Reworked the implementation of batch grouping. The sequence number
mechanism was replaced by per-batch sync.WaitGroup embedded in the
rocksDBBatch structure. Once a batch is committed (and synced if
requested), the wait group is signalled ensuring that only the desired
goroutine is woken instead of waking all of the goroutines in the
previous implementation. Syncing of the WAL is performed by a dedicated
thread add only the batches which specify sync==true wait for the
syncing to occur.

Added "rocksdb.min_wal_sync_interval" which adds specifies a minimum
delay to wait between calls to SyncWAL. The default of 1ms was
experimentally determined to reduce disk IOs by ~50% (vs 0ms) while leaving write
throughput unchanged.

For a write-only workload (`ycsb -workload F`), write throughput
improved by 15%. Raft command commit latency (unsynced commits) dropped
from 30-40ms to 6-10ms.
  • Loading branch information
petermattis committed Jul 10, 2017
1 parent 7a17925 commit a4d1f55
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 43 deletions.
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/show_source
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ kv.raft_log.synchronize true b set to t
kv.snapshot_rebalance.max_rate 2.0 MiB z the rate limit (bytes/sec) to use for rebalance snapshots
kv.snapshot_recovery.max_rate 8.0 MiB z the rate limit (bytes/sec) to use for recovery snapshots
kv.transaction.max_intents 100000 i maximum number of write intents allowed for a KV transaction
rocksdb.min_wal_sync_interval 1ms d minimum duration between syncs of the RocksDB WAL
server.declined_reservation_timeout 1s d the amount of time to consider the store throttled for up-replication after a reservation was declined
server.failed_reservation_timeout 5s d the amount of time to consider the store throttled for up-replication after a failed reservation call
server.remote_debugging.mode local s set to enable remote debugging, localhost-only or disable (any, local, off)
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/engine/db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1667,6 +1667,10 @@ DBStatus DBFlush(DBEngine* db) {
return ToDBStatus(db->rep->Flush(options));
}

DBStatus DBSyncWAL(DBEngine* db) {
return ToDBStatus(db->rep->SyncWAL());
}

DBStatus DBCompact(DBEngine* db) {
rocksdb::CompactRangeOptions options;
// By default, RocksDB doesn't recompact the bottom level (unless
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/engine/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ void DBClose(DBEngine* db);
// complete.
DBStatus DBFlush(DBEngine* db);

// Syncs the RocksDB WAL ensuring all data is persisted to
// disk. Blocks until the operation is complete.
DBStatus DBSyncWAL(DBEngine* db);

// Forces an immediate compaction over all keys.
DBStatus DBCompact(DBEngine* db);

Expand Down
164 changes: 121 additions & 43 deletions pkg/storage/engine/rocksdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand All @@ -63,6 +64,11 @@ import (
// #include "db.h"
import "C"

var minWALSyncInterval = settings.RegisterDurationSetting(
"rocksdb.min_wal_sync_interval",
"minimum duration between syncs of the RocksDB WAL",
1*time.Millisecond)

//export rocksDBLog
func rocksDBLog(s *C.char, n C.int) {
// Note that rocksdb logging is only enabled if log.V(3) is true
Expand Down Expand Up @@ -92,7 +98,7 @@ const (
// max_open_files option. If more file descriptors are available than the
// recommended number, than the default value is used.
RecommendedMaxOpenFiles = 10000
// MinimumMaxOpenFiles is The minimum value that rocksDB's max_open_files
// MinimumMaxOpenFiles is the minimum value that rocksDB's max_open_files
// option can be set to. While this should be set as high as possible, the
// minimum total for a single store node must be under 2048 for Windows
// compatibility. See:
Expand Down Expand Up @@ -294,16 +300,19 @@ type RocksDB struct {
cache RocksDBCache // Shared cache.
maxSize int64 // Used for calculating rebalancing and free space.
maxOpenFiles int // The maximum number of open files this instance will use.
deallocated chan struct{} // Closed when the underlying handle is deallocated.

commit struct {
syncutil.Mutex
cond *sync.Cond
committing bool
commitSeq uint64
pendingSeq uint64
pendingSync bool
pending []*rocksDBBatch
cond sync.Cond
committing bool
pending []*rocksDBBatch
}

syncer struct {
syncutil.Mutex
cond sync.Cond
closed bool
pending []*rocksDBBatch
}
}

Expand All @@ -328,7 +337,6 @@ func NewRocksDB(
cache: cache.ref(),
maxSize: maxSize,
maxOpenFiles: maxOpenFiles,
deallocated: make(chan struct{}),
}

auxDir := filepath.Join(dir, "auxiliary")
Expand All @@ -346,9 +354,8 @@ func newMemRocksDB(attrs roachpb.Attributes, cache RocksDBCache, maxSize int64)
r := &RocksDB{
attrs: attrs,
// dir: empty dir == "mem" RocksDB instance.
cache: cache.ref(),
maxSize: maxSize,
deallocated: make(chan struct{}),
cache: cache.ref(),
maxSize: maxSize,
}

if err := r.SetAuxiliaryDir(os.TempDir()); err != nil {
Expand Down Expand Up @@ -415,16 +422,65 @@ func (r *RocksDB) open() error {
}
}

r.commit.cond = sync.NewCond(&r.commit.Mutex)
r.commit.cond.L = &r.commit.Mutex
r.syncer.cond.L = &r.syncer.Mutex

// Start a goroutine that will finish when the underlying handle
// is deallocated. This is used to check a leak in tests.
go func() {
<-r.deallocated
}()
// NB: The sync goroutine acts as a check that the RocksDB instance was
// properly closed as the goroutine will leak otherwise.
go r.syncLoop()
return nil
}

func (r *RocksDB) syncLoop() {
// Lock the OS thread to prevent other goroutines from running on it. We
// don't want other goroutines to run on this thread because of the
// relatively long (multiple millisecond) Cgo calls that are
// performed. Scheduling other goroutines on this thread would introduce
// latency to those goroutines whenever this goroutine needs to sync the WAL.
runtime.LockOSThread()
defer runtime.UnlockOSThread()

s := &r.syncer
s.Lock()
defer s.Unlock()

var lastSync time.Time

for {
for len(s.pending) == 0 && !s.closed {
s.cond.Wait()
}
if s.closed {
return
}

min := minWALSyncInterval.Get()
if delta := timeutil.Since(lastSync); delta < min {
s.Unlock()
time.Sleep(min - delta)
s.Lock()
}

pending := s.pending
s.pending = nil

s.Unlock()

var err error
if r.dir != "" {
err = statusToError(C.DBSyncWAL(r.rdb))
lastSync = timeutil.Now()
}

for _, b := range pending {
b.commitErr = err
b.commitWG.Done()
}

s.Lock()
}
}

// Close closes the database by deallocating the underlying handle.
func (r *RocksDB) Close() {
if r.rdb == nil {
Expand All @@ -443,7 +499,10 @@ func (r *RocksDB) Close() {
r.rdb = nil
}
r.cache.Release()
close(r.deallocated)
r.syncer.Lock()
r.syncer.closed = true
r.syncer.cond.Signal()
r.syncer.Unlock()
}

// Closed returns true if the engine is closed.
Expand Down Expand Up @@ -1088,7 +1147,9 @@ type rocksDBBatch struct {
distinctOpen bool
distinctNeedsFlush bool
writeOnly bool
syncCommit bool
commitErr error
commitWG sync.WaitGroup
}

func newRocksDBBatch(parent *RocksDB, writeOnly bool) *rocksDBBatch {
Expand Down Expand Up @@ -1258,30 +1319,24 @@ func (r *rocksDBBatch) Commit(syncCommit bool) error {
// batches commits together. While the batching below often can batch 20 or
// 30 concurrent commits.
if r.writeOnly {
// The leader for the commit is the first batch to be added to the pending
// slice. Each commit has an associated sequence number. For a given
// sequence number, there can be only a single leader.
c := &r.parent.commit
r.commitWG.Add(1)
r.syncCommit = syncCommit

// The leader for the commit is the first batch to be added to the pending
// slice. Every batch has an associated wait group which is signalled when
// the commit is complete.
c.Lock()
leader := len(c.pending) == 0
// Perform a sync if any of the commits require a sync.
c.pendingSync = c.pendingSync || syncCommit
c.pending = append(c.pending, r)
seq := c.pendingSeq

if leader {
// We're the leader. Wait for any running commit to finish.
for c.committing {
c.cond.Wait()
}
if seq != c.pendingSeq {
log.Fatalf(context.TODO(), "expected commit sequence %d, but found %d", seq, c.pendingSeq)
}
pending := c.pending
syncCommit = c.pendingSync
c.pending = nil
c.pendingSeq++
c.pendingSync = false
c.committing = true
c.Unlock()

Expand All @@ -1294,25 +1349,48 @@ func (r *rocksDBBatch) Commit(syncCommit bool) error {
}

if err == nil {
err = r.commitInternal(syncCommit)
err = r.commitInternal(false /* sync */)
}

// Propagate the error to all of the batches involved in the commit.
// We're done committing the batch, let the next group of batches
// proceed.
c.Lock()
c.committing = false
c.cond.Signal()
c.Unlock()

// Propagate the error to all of the batches involved in the commit. If a
// batch requires syncing and the commit was successful, add it to the
// syncing list. Note that we're reusing the pending list here for the
// syncing list.
syncing := pending[:0]
for _, b := range pending {
b.commitErr = err
if err != nil || !b.syncCommit {
b.commitErr = err
b.commitWG.Done()
} else {
syncing = append(syncing, b)
}
}

c.Lock()
c.committing = false
c.commitSeq = seq
c.cond.Broadcast()
} else {
// We're a follower. Wait for the commit to finish.
for c.commitSeq < seq {
c.cond.Wait()
if len(syncing) > 0 {
// The commit was successful and one or more of the batches requires
// syncing: notify the sync goroutine.
s := &r.parent.syncer
s.Lock()
if len(s.pending) == 0 {
s.pending = syncing
} else {
s.pending = append(s.pending, syncing...)
}
s.cond.Signal()
s.Unlock()
}
} else {
c.Unlock()
}
c.Unlock()
// Wait for the commit/sync to finish.
r.commitWG.Wait()
return r.commitErr
}

Expand Down

0 comments on commit a4d1f55

Please sign in to comment.