diff --git a/pkg/sql/logictest/testdata/logic_test/show_source b/pkg/sql/logictest/testdata/logic_test/show_source index 9399034e094e..68e2d739ffb7 100644 --- a/pkg/sql/logictest/testdata/logic_test/show_source +++ b/pkg/sql/logictest/testdata/logic_test/show_source @@ -62,6 +62,7 @@ kv.raft_log.synchronize true b set to t kv.snapshot_rebalance.max_rate 2.0 MiB z the rate limit (bytes/sec) to use for rebalance snapshots kv.snapshot_recovery.max_rate 8.0 MiB z the rate limit (bytes/sec) to use for recovery snapshots kv.transaction.max_intents 100000 i maximum number of write intents allowed for a KV transaction +rocksdb.min_wal_sync_interval 1ms d minimum duration between syncs of the RocksDB WAL server.declined_reservation_timeout 1s d the amount of time to consider the store throttled for up-replication after a reservation was declined server.failed_reservation_timeout 5s d the amount of time to consider the store throttled for up-replication after a failed reservation call server.remote_debugging.mode local s set to enable remote debugging, localhost-only or disable (any, local, off) diff --git a/pkg/storage/engine/db.cc b/pkg/storage/engine/db.cc index 0206d2d1dc5c..2d280c1b79e4 100644 --- a/pkg/storage/engine/db.cc +++ b/pkg/storage/engine/db.cc @@ -1667,6 +1667,10 @@ DBStatus DBFlush(DBEngine* db) { return ToDBStatus(db->rep->Flush(options)); } +DBStatus DBSyncWAL(DBEngine* db) { + return ToDBStatus(db->rep->SyncWAL()); +} + DBStatus DBCompact(DBEngine* db) { rocksdb::CompactRangeOptions options; // By default, RocksDB doesn't recompact the bottom level (unless diff --git a/pkg/storage/engine/db.h b/pkg/storage/engine/db.h index c26b62b43d3b..a21de2f84b25 100644 --- a/pkg/storage/engine/db.h +++ b/pkg/storage/engine/db.h @@ -102,6 +102,10 @@ void DBClose(DBEngine* db); // complete. DBStatus DBFlush(DBEngine* db); +// Syncs the RocksDB WAL ensuring all data is persisted to +// disk. Blocks until the operation is complete. +DBStatus DBSyncWAL(DBEngine* db); + // Forces an immediate compaction over all keys. DBStatus DBCompact(DBEngine* db); diff --git a/pkg/storage/engine/rocksdb.go b/pkg/storage/engine/rocksdb.go index a7f410a1ada4..e0a342eabf26 100644 --- a/pkg/storage/engine/rocksdb.go +++ b/pkg/storage/engine/rocksdb.go @@ -39,6 +39,7 @@ import ( "golang.org/x/net/context" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -63,6 +64,11 @@ import ( // #include "db.h" import "C" +var minWALSyncInterval = settings.RegisterDurationSetting( + "rocksdb.min_wal_sync_interval", + "minimum duration between syncs of the RocksDB WAL", + 1*time.Millisecond) + //export rocksDBLog func rocksDBLog(s *C.char, n C.int) { // Note that rocksdb logging is only enabled if log.V(3) is true @@ -92,7 +98,7 @@ const ( // max_open_files option. If more file descriptors are available than the // recommended number, than the default value is used. RecommendedMaxOpenFiles = 10000 - // MinimumMaxOpenFiles is The minimum value that rocksDB's max_open_files + // MinimumMaxOpenFiles is the minimum value that rocksDB's max_open_files // option can be set to. While this should be set as high as possible, the // minimum total for a single store node must be under 2048 for Windows // compatibility. See: @@ -294,16 +300,19 @@ type RocksDB struct { cache RocksDBCache // Shared cache. maxSize int64 // Used for calculating rebalancing and free space. maxOpenFiles int // The maximum number of open files this instance will use. - deallocated chan struct{} // Closed when the underlying handle is deallocated. commit struct { syncutil.Mutex - cond *sync.Cond - committing bool - commitSeq uint64 - pendingSeq uint64 - pendingSync bool - pending []*rocksDBBatch + cond sync.Cond + committing bool + pending []*rocksDBBatch + } + + syncer struct { + syncutil.Mutex + cond sync.Cond + closed bool + pending []*rocksDBBatch } } @@ -328,7 +337,6 @@ func NewRocksDB( cache: cache.ref(), maxSize: maxSize, maxOpenFiles: maxOpenFiles, - deallocated: make(chan struct{}), } auxDir := filepath.Join(dir, "auxiliary") @@ -346,9 +354,8 @@ func newMemRocksDB(attrs roachpb.Attributes, cache RocksDBCache, maxSize int64) r := &RocksDB{ attrs: attrs, // dir: empty dir == "mem" RocksDB instance. - cache: cache.ref(), - maxSize: maxSize, - deallocated: make(chan struct{}), + cache: cache.ref(), + maxSize: maxSize, } if err := r.SetAuxiliaryDir(os.TempDir()); err != nil { @@ -415,16 +422,65 @@ func (r *RocksDB) open() error { } } - r.commit.cond = sync.NewCond(&r.commit.Mutex) + r.commit.cond.L = &r.commit.Mutex + r.syncer.cond.L = &r.syncer.Mutex - // Start a goroutine that will finish when the underlying handle - // is deallocated. This is used to check a leak in tests. - go func() { - <-r.deallocated - }() + // NB: The sync goroutine acts as a check that the RocksDB instance was + // properly closed as the goroutine will leak otherwise. + go r.syncLoop() return nil } +func (r *RocksDB) syncLoop() { + // Lock the OS thread to prevent other goroutines from running on it. We + // don't want other goroutines to run on this thread because of the + // relatively long (multiple millisecond) Cgo calls that are + // performed. Scheduling other goroutines on this thread would introduce + // latency to those goroutines whenever this goroutine needs to sync the WAL. + runtime.LockOSThread() + defer runtime.UnlockOSThread() + + s := &r.syncer + s.Lock() + defer s.Unlock() + + var lastSync time.Time + + for { + for len(s.pending) == 0 && !s.closed { + s.cond.Wait() + } + if s.closed { + return + } + + min := minWALSyncInterval.Get() + if delta := timeutil.Since(lastSync); delta < min { + s.Unlock() + time.Sleep(min - delta) + s.Lock() + } + + pending := s.pending + s.pending = nil + + s.Unlock() + + var err error + if r.dir != "" { + err = statusToError(C.DBSyncWAL(r.rdb)) + lastSync = timeutil.Now() + } + + for _, b := range pending { + b.commitErr = err + b.commitWG.Done() + } + + s.Lock() + } +} + // Close closes the database by deallocating the underlying handle. func (r *RocksDB) Close() { if r.rdb == nil { @@ -443,7 +499,10 @@ func (r *RocksDB) Close() { r.rdb = nil } r.cache.Release() - close(r.deallocated) + r.syncer.Lock() + r.syncer.closed = true + r.syncer.cond.Signal() + r.syncer.Unlock() } // Closed returns true if the engine is closed. @@ -1088,7 +1147,9 @@ type rocksDBBatch struct { distinctOpen bool distinctNeedsFlush bool writeOnly bool + syncCommit bool commitErr error + commitWG sync.WaitGroup } func newRocksDBBatch(parent *RocksDB, writeOnly bool) *rocksDBBatch { @@ -1258,30 +1319,24 @@ func (r *rocksDBBatch) Commit(syncCommit bool) error { // batches commits together. While the batching below often can batch 20 or // 30 concurrent commits. if r.writeOnly { - // The leader for the commit is the first batch to be added to the pending - // slice. Each commit has an associated sequence number. For a given - // sequence number, there can be only a single leader. c := &r.parent.commit + r.commitWG.Add(1) + r.syncCommit = syncCommit + + // The leader for the commit is the first batch to be added to the pending + // slice. Every batch has an associated wait group which is signalled when + // the commit is complete. c.Lock() leader := len(c.pending) == 0 - // Perform a sync if any of the commits require a sync. - c.pendingSync = c.pendingSync || syncCommit c.pending = append(c.pending, r) - seq := c.pendingSeq if leader { // We're the leader. Wait for any running commit to finish. for c.committing { c.cond.Wait() } - if seq != c.pendingSeq { - log.Fatalf(context.TODO(), "expected commit sequence %d, but found %d", seq, c.pendingSeq) - } pending := c.pending - syncCommit = c.pendingSync c.pending = nil - c.pendingSeq++ - c.pendingSync = false c.committing = true c.Unlock() @@ -1294,25 +1349,48 @@ func (r *rocksDBBatch) Commit(syncCommit bool) error { } if err == nil { - err = r.commitInternal(syncCommit) + err = r.commitInternal(false /* sync */) } - // Propagate the error to all of the batches involved in the commit. + // We're done committing the batch, let the next group of batches + // proceed. + c.Lock() + c.committing = false + c.cond.Signal() + c.Unlock() + + // Propagate the error to all of the batches involved in the commit. If a + // batch requires syncing and the commit was successful, add it to the + // syncing list. Note that we're reusing the pending list here for the + // syncing list. + syncing := pending[:0] for _, b := range pending { - b.commitErr = err + if err != nil || !b.syncCommit { + b.commitErr = err + b.commitWG.Done() + } else { + syncing = append(syncing, b) + } } - c.Lock() - c.committing = false - c.commitSeq = seq - c.cond.Broadcast() - } else { - // We're a follower. Wait for the commit to finish. - for c.commitSeq < seq { - c.cond.Wait() + if len(syncing) > 0 { + // The commit was successful and one or more of the batches requires + // syncing: notify the sync goroutine. + s := &r.parent.syncer + s.Lock() + if len(s.pending) == 0 { + s.pending = syncing + } else { + s.pending = append(s.pending, syncing...) + } + s.cond.Signal() + s.Unlock() } + } else { + c.Unlock() } - c.Unlock() + // Wait for the commit/sync to finish. + r.commitWG.Wait() return r.commitErr } diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 0698d2f001c3..8e32e32358eb 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -3193,6 +3193,17 @@ func (r *Replica) handleRaftReadyRaftMuLocked( writer.Close() // Synchronously commit the batch with the Raft log entries and Raft hard // state as we're promising not to lose this data. + // + // Note that the data is visible to other goroutines before it is synced to + // disk. This is fine. The important constraints are that these syncs happen + // before Raft messages are sent and before the call to RawNode.Advance. Our + // regular locking is sufficient for this and if other goroutines can see the + // data early, that's fine. In particular, snapshots are not a problem (I + // think they're the only thing that might access log entries or HardState + // from other goroutines). Snapshots do not include either the HardState or + // uncommitted log entries, and even if they did include log entries that + // were not persisted to disk, it wouldn't be a problem because raft does not + // infer the that entries are persisted on the node that sends a snapshot. start := timeutil.Now() if err := batch.Commit(syncRaftLog.Get() && rd.MustSync); err != nil { return stats, err