From 82cbb49ff7e1db6e1b01d96f1100954b599b6c5f Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Fri, 7 Jul 2017 15:17:36 -0400 Subject: [PATCH] storage/engine: use SyncWAL instead of sync on commit The previous implementation of batching for synced commits had the unfortunate property that commits that did not require syncing were required to wait for the WAL to be synced. When RocksDB writes a batch with sync==true it internally does: 1. Add batch to WAL 2. Sync WAL 3. Add entries to mem table Switch to using SyncWAL to explicitly sync the WAL after writing a batch. This is slightly different semantics from the above: 1. Add batch to WAL 2. Add entries to mem table 3. Sync WAL The advantage of this new approach is that non-synced batches do not have to wait for the WAL to sync. Prior to this change, it was observed that essentially every batch was waiting for a WAL sync. Approximately half of all batch commits are performed with sync==true (half of all batch commits are for Raft log entries or Raft state). Forcing the non-synced commits to wait for the WAL added significant time. Reworked the implementation of batch grouping. The sequence number mechanism was replaced by per-batch sync.WaitGroup embedded in the rocksDBBatch structure. Once a batch is committed (and synced if requested), the wait group is signalled ensuring that only the desired goroutine is woken instead of waking all of the goroutines in the previous implementation. Syncing of the WAL is performed by a dedicated thread add only the batches which specify sync==true wait for the syncing to occur. Added "rocksdb.min_wal_sync_interval" which adds specifies a minimum delay to wait between calls to SyncWAL. The default of 1ms was experimentally determined to reduce disk IOs by ~50% (vs 0ms) while leaving write throughput unchanged. For a write-only workload (`ycsb -workload F`), write throughput improved by 15%. Raft command commit latency (unsynced commits) dropped from 30-40ms to 6-10ms. --- .../logictest/testdata/logic_test/show_source | 1 + pkg/storage/engine/db.cc | 4 + pkg/storage/engine/db.h | 4 + pkg/storage/engine/rocksdb.go | 164 +++++++++++++----- pkg/storage/replica.go | 11 ++ 5 files changed, 141 insertions(+), 43 deletions(-) diff --git a/pkg/sql/logictest/testdata/logic_test/show_source b/pkg/sql/logictest/testdata/logic_test/show_source index 9399034e094e..68e2d739ffb7 100644 --- a/pkg/sql/logictest/testdata/logic_test/show_source +++ b/pkg/sql/logictest/testdata/logic_test/show_source @@ -62,6 +62,7 @@ kv.raft_log.synchronize true b set to t kv.snapshot_rebalance.max_rate 2.0 MiB z the rate limit (bytes/sec) to use for rebalance snapshots kv.snapshot_recovery.max_rate 8.0 MiB z the rate limit (bytes/sec) to use for recovery snapshots kv.transaction.max_intents 100000 i maximum number of write intents allowed for a KV transaction +rocksdb.min_wal_sync_interval 1ms d minimum duration between syncs of the RocksDB WAL server.declined_reservation_timeout 1s d the amount of time to consider the store throttled for up-replication after a reservation was declined server.failed_reservation_timeout 5s d the amount of time to consider the store throttled for up-replication after a failed reservation call server.remote_debugging.mode local s set to enable remote debugging, localhost-only or disable (any, local, off) diff --git a/pkg/storage/engine/db.cc b/pkg/storage/engine/db.cc index 0206d2d1dc5c..2d280c1b79e4 100644 --- a/pkg/storage/engine/db.cc +++ b/pkg/storage/engine/db.cc @@ -1667,6 +1667,10 @@ DBStatus DBFlush(DBEngine* db) { return ToDBStatus(db->rep->Flush(options)); } +DBStatus DBSyncWAL(DBEngine* db) { + return ToDBStatus(db->rep->SyncWAL()); +} + DBStatus DBCompact(DBEngine* db) { rocksdb::CompactRangeOptions options; // By default, RocksDB doesn't recompact the bottom level (unless diff --git a/pkg/storage/engine/db.h b/pkg/storage/engine/db.h index c26b62b43d3b..a21de2f84b25 100644 --- a/pkg/storage/engine/db.h +++ b/pkg/storage/engine/db.h @@ -102,6 +102,10 @@ void DBClose(DBEngine* db); // complete. DBStatus DBFlush(DBEngine* db); +// Syncs the RocksDB WAL ensuring all data is persisted to +// disk. Blocks until the operation is complete. +DBStatus DBSyncWAL(DBEngine* db); + // Forces an immediate compaction over all keys. DBStatus DBCompact(DBEngine* db); diff --git a/pkg/storage/engine/rocksdb.go b/pkg/storage/engine/rocksdb.go index a7f410a1ada4..e0a342eabf26 100644 --- a/pkg/storage/engine/rocksdb.go +++ b/pkg/storage/engine/rocksdb.go @@ -39,6 +39,7 @@ import ( "golang.org/x/net/context" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -63,6 +64,11 @@ import ( // #include "db.h" import "C" +var minWALSyncInterval = settings.RegisterDurationSetting( + "rocksdb.min_wal_sync_interval", + "minimum duration between syncs of the RocksDB WAL", + 1*time.Millisecond) + //export rocksDBLog func rocksDBLog(s *C.char, n C.int) { // Note that rocksdb logging is only enabled if log.V(3) is true @@ -92,7 +98,7 @@ const ( // max_open_files option. If more file descriptors are available than the // recommended number, than the default value is used. RecommendedMaxOpenFiles = 10000 - // MinimumMaxOpenFiles is The minimum value that rocksDB's max_open_files + // MinimumMaxOpenFiles is the minimum value that rocksDB's max_open_files // option can be set to. While this should be set as high as possible, the // minimum total for a single store node must be under 2048 for Windows // compatibility. See: @@ -294,16 +300,19 @@ type RocksDB struct { cache RocksDBCache // Shared cache. maxSize int64 // Used for calculating rebalancing and free space. maxOpenFiles int // The maximum number of open files this instance will use. - deallocated chan struct{} // Closed when the underlying handle is deallocated. commit struct { syncutil.Mutex - cond *sync.Cond - committing bool - commitSeq uint64 - pendingSeq uint64 - pendingSync bool - pending []*rocksDBBatch + cond sync.Cond + committing bool + pending []*rocksDBBatch + } + + syncer struct { + syncutil.Mutex + cond sync.Cond + closed bool + pending []*rocksDBBatch } } @@ -328,7 +337,6 @@ func NewRocksDB( cache: cache.ref(), maxSize: maxSize, maxOpenFiles: maxOpenFiles, - deallocated: make(chan struct{}), } auxDir := filepath.Join(dir, "auxiliary") @@ -346,9 +354,8 @@ func newMemRocksDB(attrs roachpb.Attributes, cache RocksDBCache, maxSize int64) r := &RocksDB{ attrs: attrs, // dir: empty dir == "mem" RocksDB instance. - cache: cache.ref(), - maxSize: maxSize, - deallocated: make(chan struct{}), + cache: cache.ref(), + maxSize: maxSize, } if err := r.SetAuxiliaryDir(os.TempDir()); err != nil { @@ -415,16 +422,65 @@ func (r *RocksDB) open() error { } } - r.commit.cond = sync.NewCond(&r.commit.Mutex) + r.commit.cond.L = &r.commit.Mutex + r.syncer.cond.L = &r.syncer.Mutex - // Start a goroutine that will finish when the underlying handle - // is deallocated. This is used to check a leak in tests. - go func() { - <-r.deallocated - }() + // NB: The sync goroutine acts as a check that the RocksDB instance was + // properly closed as the goroutine will leak otherwise. + go r.syncLoop() return nil } +func (r *RocksDB) syncLoop() { + // Lock the OS thread to prevent other goroutines from running on it. We + // don't want other goroutines to run on this thread because of the + // relatively long (multiple millisecond) Cgo calls that are + // performed. Scheduling other goroutines on this thread would introduce + // latency to those goroutines whenever this goroutine needs to sync the WAL. + runtime.LockOSThread() + defer runtime.UnlockOSThread() + + s := &r.syncer + s.Lock() + defer s.Unlock() + + var lastSync time.Time + + for { + for len(s.pending) == 0 && !s.closed { + s.cond.Wait() + } + if s.closed { + return + } + + min := minWALSyncInterval.Get() + if delta := timeutil.Since(lastSync); delta < min { + s.Unlock() + time.Sleep(min - delta) + s.Lock() + } + + pending := s.pending + s.pending = nil + + s.Unlock() + + var err error + if r.dir != "" { + err = statusToError(C.DBSyncWAL(r.rdb)) + lastSync = timeutil.Now() + } + + for _, b := range pending { + b.commitErr = err + b.commitWG.Done() + } + + s.Lock() + } +} + // Close closes the database by deallocating the underlying handle. func (r *RocksDB) Close() { if r.rdb == nil { @@ -443,7 +499,10 @@ func (r *RocksDB) Close() { r.rdb = nil } r.cache.Release() - close(r.deallocated) + r.syncer.Lock() + r.syncer.closed = true + r.syncer.cond.Signal() + r.syncer.Unlock() } // Closed returns true if the engine is closed. @@ -1088,7 +1147,9 @@ type rocksDBBatch struct { distinctOpen bool distinctNeedsFlush bool writeOnly bool + syncCommit bool commitErr error + commitWG sync.WaitGroup } func newRocksDBBatch(parent *RocksDB, writeOnly bool) *rocksDBBatch { @@ -1258,30 +1319,24 @@ func (r *rocksDBBatch) Commit(syncCommit bool) error { // batches commits together. While the batching below often can batch 20 or // 30 concurrent commits. if r.writeOnly { - // The leader for the commit is the first batch to be added to the pending - // slice. Each commit has an associated sequence number. For a given - // sequence number, there can be only a single leader. c := &r.parent.commit + r.commitWG.Add(1) + r.syncCommit = syncCommit + + // The leader for the commit is the first batch to be added to the pending + // slice. Every batch has an associated wait group which is signalled when + // the commit is complete. c.Lock() leader := len(c.pending) == 0 - // Perform a sync if any of the commits require a sync. - c.pendingSync = c.pendingSync || syncCommit c.pending = append(c.pending, r) - seq := c.pendingSeq if leader { // We're the leader. Wait for any running commit to finish. for c.committing { c.cond.Wait() } - if seq != c.pendingSeq { - log.Fatalf(context.TODO(), "expected commit sequence %d, but found %d", seq, c.pendingSeq) - } pending := c.pending - syncCommit = c.pendingSync c.pending = nil - c.pendingSeq++ - c.pendingSync = false c.committing = true c.Unlock() @@ -1294,25 +1349,48 @@ func (r *rocksDBBatch) Commit(syncCommit bool) error { } if err == nil { - err = r.commitInternal(syncCommit) + err = r.commitInternal(false /* sync */) } - // Propagate the error to all of the batches involved in the commit. + // We're done committing the batch, let the next group of batches + // proceed. + c.Lock() + c.committing = false + c.cond.Signal() + c.Unlock() + + // Propagate the error to all of the batches involved in the commit. If a + // batch requires syncing and the commit was successful, add it to the + // syncing list. Note that we're reusing the pending list here for the + // syncing list. + syncing := pending[:0] for _, b := range pending { - b.commitErr = err + if err != nil || !b.syncCommit { + b.commitErr = err + b.commitWG.Done() + } else { + syncing = append(syncing, b) + } } - c.Lock() - c.committing = false - c.commitSeq = seq - c.cond.Broadcast() - } else { - // We're a follower. Wait for the commit to finish. - for c.commitSeq < seq { - c.cond.Wait() + if len(syncing) > 0 { + // The commit was successful and one or more of the batches requires + // syncing: notify the sync goroutine. + s := &r.parent.syncer + s.Lock() + if len(s.pending) == 0 { + s.pending = syncing + } else { + s.pending = append(s.pending, syncing...) + } + s.cond.Signal() + s.Unlock() } + } else { + c.Unlock() } - c.Unlock() + // Wait for the commit/sync to finish. + r.commitWG.Wait() return r.commitErr } diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 0698d2f001c3..8e32e32358eb 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -3193,6 +3193,17 @@ func (r *Replica) handleRaftReadyRaftMuLocked( writer.Close() // Synchronously commit the batch with the Raft log entries and Raft hard // state as we're promising not to lose this data. + // + // Note that the data is visible to other goroutines before it is synced to + // disk. This is fine. The important constraints are that these syncs happen + // before Raft messages are sent and before the call to RawNode.Advance. Our + // regular locking is sufficient for this and if other goroutines can see the + // data early, that's fine. In particular, snapshots are not a problem (I + // think they're the only thing that might access log entries or HardState + // from other goroutines). Snapshots do not include either the HardState or + // uncommitted log entries, and even if they did include log entries that + // were not persisted to disk, it wouldn't be a problem because raft does not + // infer the that entries are persisted on the node that sends a snapshot. start := timeutil.Now() if err := batch.Commit(syncRaftLog.Get() && rd.MustSync); err != nil { return stats, err