Skip to content

Commit

Permalink
storage: move Raft log configurations into base.RaftConfig
Browse files Browse the repository at this point in the history
This centralizes all Raft configuration and makes it easier
to configure in tests.

Release note: None
  • Loading branch information
nvanbenschoten committed Oct 17, 2018
1 parent 7a80951 commit 62ea444
Show file tree
Hide file tree
Showing 11 changed files with 108 additions and 56 deletions.
51 changes: 49 additions & 2 deletions pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,28 @@ const (
DefaultTableDescriptorLeaseRenewalTimeout = time.Minute
)

var defaultRaftElectionTimeoutTicks = envutil.EnvOrDefaultInt(
"COCKROACH_RAFT_ELECTION_TIMEOUT_TICKS", 15)
var (
// defaultRaftElectionTimeoutTicks specifies the number of Raft Tick
// invocations that must pass between elections.
defaultRaftElectionTimeoutTicks = envutil.EnvOrDefaultInt(
"COCKROACH_RAFT_ELECTION_TIMEOUT_TICKS", 15)

// defaultRaftLogTruncationThreshold specifies the upper bound that a single
// Range's Raft log can grow to before log truncations are triggered, even
// if that means a snapshot will be required for a straggling follower.
defaultRaftLogTruncationThreshold = envutil.EnvOrDefaultInt64(
"COCKROACH_RAFT_LOG_TRUNCATION_THRESHOLD", 4<<20 /* 4 MB */)

// defaultRaftMaxSizePerMsg specifies the maximum number of Raft log entries
// that a leader will send to followers in a single MsgApp.
defaultRaftMaxSizePerMsg = envutil.EnvOrDefaultInt(
"COCKROACH_RAFT_MAX_SIZE_PER_MSG", 16<<10 /* 16 KB */)

// defaultRaftMaxSizePerMsg specifies how many "inflight" messages a leader
// will send to a follower without hearing a response.
defaultRaftMaxInflightMsgs = envutil.EnvOrDefaultInt(
"COCKROACH_RAFT_MAX_INFLIGHT_MSGS", 64)
)

type lazyHTTPClient struct {
once sync.Once
Expand Down Expand Up @@ -421,6 +441,24 @@ type RaftConfig struct {
// RangeLeaseRaftElectionTimeoutMultiplier specifies what multiple the leader
// lease active duration should be of the raft election timeout.
RangeLeaseRaftElectionTimeoutMultiplier float64

// RaftLogTruncationThreshold controls how large a single Range's Raft log
// can grow. When a Range's Raft log grows above this size, the Range will
// begin performing log truncations.
RaftLogTruncationThreshold int64

// RaftMaxSizePerMsg controls how many Raft log entries the leader will send to
// followers in a single MsgApp.
RaftMaxSizePerMsg uint64

// RaftMaxInflightMsgs controls how many "inflight" messages Raft will send
// to a follower without hearing a response. The total number of Raft log
// entries is a combination of this setting and RaftMaxSizePerMsg. The
// current default settings provide for up to 1 MB of raft log to be sent
// without acknowledgement. With an average entry size of 1 KB that
// translates to ~1024 commands that might be executed in the handling of a
// single raft.Ready operation.
RaftMaxInflightMsgs int
}

// SetDefaults initializes unset fields.
Expand All @@ -434,6 +472,15 @@ func (cfg *RaftConfig) SetDefaults() {
if cfg.RangeLeaseRaftElectionTimeoutMultiplier == 0 {
cfg.RangeLeaseRaftElectionTimeoutMultiplier = defaultRangeLeaseRaftElectionTimeoutMultiplier
}
if cfg.RaftLogTruncationThreshold == 0 {
cfg.RaftLogTruncationThreshold = defaultRaftLogTruncationThreshold
}
if cfg.RaftMaxSizePerMsg == 0 {
cfg.RaftMaxSizePerMsg = uint64(defaultRaftMaxSizePerMsg)
}
if cfg.RaftMaxInflightMsgs == 0 {
cfg.RaftMaxInflightMsgs = defaultRaftMaxInflightMsgs
}
}

// RaftElectionTimeout returns the raft election timeout, as computed from the
Expand Down
8 changes: 2 additions & 6 deletions pkg/storage/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
)
Expand All @@ -49,9 +48,6 @@ const (
raftLogQueueConcurrency = 4
)

// raftLogMaxSize limits the maximum size of the Raft log.
var raftLogMaxSize = envutil.EnvOrDefaultInt64("COCKROACH_RAFT_LOG_MAX_SIZE", 4<<20 /* 4 MB */)

// raftLogQueue manages a queue of replicas slated to have their raft logs
// truncated by removing unneeded entries.
type raftLogQueue struct {
Expand Down Expand Up @@ -118,8 +114,8 @@ func getTruncatableIndexes(ctx context.Context, r *Replica) (uint64, uint64, int
if targetSize > *r.mu.zone.RangeMaxBytes {
targetSize = *r.mu.zone.RangeMaxBytes
}
if targetSize > raftLogMaxSize {
targetSize = raftLogMaxSize
if targetSize > r.store.cfg.RaftLogTruncationThreshold {
targetSize = r.store.cfg.RaftLogTruncationThreshold
}
firstIndex, err := r.raftFirstIndexLocked()
pendingSnapshotIndex := r.mu.pendingSnapshotIndex
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.etcd.io/etcd/raft/raftpb"
"google.golang.org/grpc"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -615,6 +616,7 @@ func (t *RaftTransport) startProcessNewQueue(
// for closing the OutgoingSnapshot.
func (t *RaftTransport) SendSnapshot(
ctx context.Context,
raftCfg *base.RaftConfig,
storePool *StorePool,
header SnapshotRequest_Header,
snap *OutgoingSnapshot,
Expand All @@ -640,5 +642,5 @@ func (t *RaftTransport) SendSnapshot(
log.Warningf(ctx, "failed to close snapshot stream: %s", err)
}
}()
return sendSnapshot(ctx, t.st, stream, storePool, header, snap, newBatch, sent)
return sendSnapshot(ctx, raftCfg, t.st, stream, storePool, header, snap, newBatch, sent)
}
29 changes: 16 additions & 13 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,8 @@ const (
defaultReplicaRaftMuWarnThreshold = 500 * time.Millisecond
)

var raftLogTooLargeSize = 4 * raftLogMaxSize

var testingDisableQuiescence = envutil.EnvOrDefaultBool("COCKROACH_DISABLE_QUIESCENCE", false)

// TODO(irfansharif, peter): What's a good default? Too low and everything comes
// to a grinding halt, too high and we're not really throttling anything
// (we'll still generate snapshots). Should it be adjusted dynamically?
//
// We set the defaultProposalQuota to be less than raftLogMaxSize, in doing so
// we ensure all replicas have sufficiently up to date logs so that when the
// log gets truncated, the followers do not need non-preemptive snapshots.
var defaultProposalQuota = raftLogMaxSize / 4

var syncRaftLog = settings.RegisterBoolSetting(
"kv.raft_log.synchronize",
"set to true to synchronize on Raft log writes to persistent storage ('false' risks data loss)",
Expand Down Expand Up @@ -1129,12 +1118,23 @@ func (r *Replica) updateProposalQuotaRaftMuLocked(
log.Fatalf(ctx, "len(r.mu.commandSizes) = %d, expected 0", commandSizesLen)
}

// We set the defaultProposalQuota to be less than the Raft log
// truncation threshold, in doing so we ensure all replicas have
// sufficiently up to date logs so that when the log gets truncated,
// the followers do not need non-preemptive snapshots. Changing this
// deserves care. Too low and everything comes to a grinding halt,
// too high and we're not really throttling anything (we'll still
// generate snapshots).
//
// TODO(nvanbenschoten): clean this up in later commits.
proposalQuota := r.store.cfg.RaftLogTruncationThreshold / 4

// Raft may propose commands itself (specifically the empty
// commands when leadership changes), and these commands don't go
// through the code paths where we acquire quota from the pool. To
// offset this we reset the quota pool whenever leadership changes
// hands.
r.mu.proposalQuota = newQuotaPool(defaultProposalQuota)
r.mu.proposalQuota = newQuotaPool(proposalQuota)
r.mu.lastUpdateTimes = make(map[roachpb.ReplicaID]time.Time)
r.mu.commandSizes = make(map[storagebase.CmdIDKey]int)
} else if r.mu.proposalQuota != nil {
Expand Down Expand Up @@ -6967,6 +6967,7 @@ func (r *Replica) Metrics(
return calcReplicaMetrics(
ctx,
now,
&r.store.cfg.RaftConfig,
zone,
livenessMap,
availableNodes,
Expand Down Expand Up @@ -6994,6 +6995,7 @@ func HasRaftLeader(raftStatus *raft.Status) bool {
func calcReplicaMetrics(
ctx context.Context,
now hlc.Timestamp,
raftCfg *base.RaftConfig,
zone *config.ZoneConfig,
livenessMap IsLiveMap,
availableNodes int,
Expand Down Expand Up @@ -7033,7 +7035,8 @@ func calcReplicaMetrics(
m.CmdQMetricsLocal = cmdQMetricsLocal
m.CmdQMetricsGlobal = cmdQMetricsGlobal

m.RaftLogTooLarge = raftLogSize > raftLogTooLargeSize
const raftLogTooLargeMultiple = 4
m.RaftLogTooLarge = raftLogSize > (raftLogTooLargeMultiple * raftCfg.RaftLogTruncationThreshold)

return m
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,14 @@ func (r *Replica) sendSnapshot(
r.store.metrics.RangeSnapshotsGenerated.Inc(1)
}
if err := r.store.cfg.Transport.SendSnapshot(
ctx, r.store.allocator.storePool, req, snap, r.store.Engine().NewBatch, sent); err != nil {
ctx,
&r.store.cfg.RaftConfig,
r.store.allocator.storePool,
req,
snap,
r.store.Engine().NewBatch,
sent,
); err != nil {
return &snapshotError{err}
}
return nil
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/replica_sideload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,7 @@ func TestRaftSSTableSideloadingSnapshot(t *testing.T) {
mockSender := &mockSender{}
if err := sendSnapshot(
ctx,
&tc.store.cfg.RaftConfig,
tc.store.cfg.Settings,
mockSender,
&fakeStorePool{},
Expand Down Expand Up @@ -904,6 +905,7 @@ func TestRaftSSTableSideloadingSnapshot(t *testing.T) {
mockSender := &mockSender{}
err = sendSnapshot(
ctx,
&tc.store.cfg.RaftConfig,
tc.store.cfg.Settings,
mockSender,
&fakeStorePool{},
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9272,7 +9272,7 @@ func TestReplicaMetrics(t *testing.T) {
Underreplicated: false,
}},
// The leader of a 1-replica range is up and raft log is too large.
{1, 1, desc(1), status(1, progress(2)), live(1), 5 * raftLogMaxSize,
{1, 1, desc(1), status(1, progress(2)), live(1), 5 * cfg.RaftLogTruncationThreshold,
ReplicaMetrics{
Leader: true,
RangeCounter: true,
Expand All @@ -9294,7 +9294,7 @@ func TestReplicaMetrics(t *testing.T) {
c.expected.Quiescent = i%2 == 0
c.expected.Ticking = !c.expected.Quiescent
metrics := calcReplicaMetrics(
context.Background(), hlc.Timestamp{}, &zoneConfig,
context.Background(), hlc.Timestamp{}, &cfg.RaftConfig, &zoneConfig,
c.liveness, 0, &c.desc, c.raftStatus, storagepb.LeaseStatus{},
c.storeID, c.expected.Quiescent, c.expected.Ticking,
CommandQueueMetrics{}, CommandQueueMetrics{}, c.raftLogSize)
Expand Down
31 changes: 8 additions & 23 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,35 +163,20 @@ func TestStoreConfig(clock *hlc.Clock) StoreConfig {
return sc
}

var (
raftMaxSizePerMsg = envutil.EnvOrDefaultInt("COCKROACH_RAFT_MAX_SIZE_PER_MSG", 16*1024)
raftMaxInflightMsgs = envutil.EnvOrDefaultInt("COCKROACH_RAFT_MAX_INFLIGHT_MSGS", 64)
)

func newRaftConfig(
strg raft.Storage, id uint64, appliedIndex uint64, storeCfg StoreConfig, logger raft.Logger,
) *raft.Config {
return &raft.Config{
ID: id,
Applied: appliedIndex,
ElectionTick: storeCfg.RaftElectionTimeoutTicks,
HeartbeatTick: storeCfg.RaftHeartbeatIntervalTicks,
Storage: strg,
Logger: logger,
ID: id,
Applied: appliedIndex,
ElectionTick: storeCfg.RaftElectionTimeoutTicks,
HeartbeatTick: storeCfg.RaftHeartbeatIntervalTicks,
MaxSizePerMsg: storeCfg.RaftMaxSizePerMsg,
MaxInflightMsgs: storeCfg.RaftMaxInflightMsgs,
Storage: strg,
Logger: logger,

PreVote: true,

// MaxSizePerMsg controls how many Raft log entries the leader will send to
// followers in a single MsgApp.
MaxSizePerMsg: uint64(raftMaxSizePerMsg),
// MaxInflightMsgs controls how many "inflight" messages Raft will send to
// a follower without hearing a response. The total number of Raft log
// entries is a combination of this setting and MaxSizePerMsg. The current
// settings provide for up to 1 MB of raft log to be sent without
// acknowledgement. With an average entry size of 1 KB that translates to
// ~1024 commands that might be executed in the handling of a single
// raft.Ready operation.
MaxInflightMsgs: raftMaxInflightMsgs,
}
}

Expand Down
13 changes: 10 additions & 3 deletions pkg/storage/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.etcd.io/etcd/raft/raftpb"
"golang.org/x/time/rate"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -98,7 +99,8 @@ func assertStrategy(
// kvBatchSnapshotStrategy is an implementation of snapshotStrategy that streams
// batches of KV pairs in the BatchRepr format.
type kvBatchSnapshotStrategy struct {
status string
raftCfg *base.RaftConfig
status string

// Fields used when sending snapshots.
batchSize int64
Expand Down Expand Up @@ -228,7 +230,8 @@ func (kvSS *kvBatchSnapshotStrategy) Send(
if err == nil {
logEntries = append(logEntries, bytes)
raftLogBytes += int64(len(bytes))
if snap.snapType == snapTypePreemptive && raftLogBytes > 4*raftLogMaxSize {
if snap.snapType == snapTypePreemptive &&
raftLogBytes > 4*kvSS.raftCfg.RaftLogTruncationThreshold {
// If the raft log is too large, abort the snapshot instead of
// potentially running out of memory. However, if this is a
// raft-initiated snapshot (instead of a preemptive one), we
Expand Down Expand Up @@ -492,7 +495,9 @@ func (s *Store) receiveSnapshot(
var ss snapshotStrategy
switch header.Strategy {
case SnapshotRequest_KV_BATCH:
ss = &kvBatchSnapshotStrategy{}
ss = &kvBatchSnapshotStrategy{
raftCfg: &s.cfg.RaftConfig,
}
default:
return sendSnapshotError(stream,
errors.Errorf("%s,r%d: unknown snapshot strategy: %s",
Expand Down Expand Up @@ -568,6 +573,7 @@ func (e *errMustRetrySnapshotDueToTruncation) Error() string {
// sendSnapshot sends an outgoing snapshot via a pre-opened GRPC stream.
func sendSnapshot(
ctx context.Context,
raftCfg *base.RaftConfig,
st *cluster.Settings,
stream outgoingSnapshotStream,
storePool SnapshotStorePool,
Expand Down Expand Up @@ -635,6 +641,7 @@ func sendSnapshot(
switch header.Strategy {
case SnapshotRequest_KV_BATCH:
ss = &kvBatchSnapshotStrategy{
raftCfg: raftCfg,
batchSize: batchSize,
limiter: limiter,
newBatch: newBatch,
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/store_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestSnapshotRaftLogLimit(t *testing.T) {

var bytesWritten int64
blob := []byte(strings.Repeat("a", 1024*1024))
for i := 0; bytesWritten < 5*raftLogMaxSize; i++ {
for i := 0; bytesWritten < 5*store.cfg.RaftLogTruncationThreshold; i++ {
pArgs := putArgs(roachpb.Key("a"), blob)
_, pErr := client.SendWrappedWith(ctx, store, roachpb.Header{RangeID: 1}, &pArgs)
if pErr != nil {
Expand All @@ -65,6 +65,7 @@ func TestSnapshotRaftLogLimit(t *testing.T) {
defer snap.Close()

ss := kvBatchSnapshotStrategy{
raftCfg: &store.cfg.RaftConfig,
limiter: rate.NewLimiter(1<<10, 1),
newBatch: eng.NewBatch,
}
Expand Down
Loading

0 comments on commit 62ea444

Please sign in to comment.