Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connect: tame thundering herd of CSRs on CA rotation #5228

Merged
merged 3 commits into from
Jan 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -1084,15 +1084,12 @@ func (a *Agent) consulConfig() (*consul.Config, error) {

if a.config.ConnectCAProvider != "" {
base.CAConfig.Provider = a.config.ConnectCAProvider
}

// Merge with the default config if it's the consul provider.
if a.config.ConnectCAProvider == "consul" {
for k, v := range a.config.ConnectCAConfig {
base.CAConfig.Config[k] = v
}
} else {
base.CAConfig.Config = a.config.ConnectCAConfig
}
// Merge connect CA Config regardless of provider (since there are some
// common config options valid to all like leaf TTL).
for k, v := range a.config.ConnectCAConfig {
base.CAConfig.Config[k] = v
}
}

Expand Down
186 changes: 150 additions & 36 deletions agent/cache-types/connect_ca_leaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,38 @@ import (

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/structs"
)

// Recommended name for registration.
const ConnectCALeafName = "connect-ca-leaf"

// caChangeInitialSpreadDefault is the jitter we apply after noticing the CA
// changed before requesting a new cert. Since we don't know how many services
// are in the cluster we can't be too smart about setting this so it's a
// tradeoff between not making root rotations take unnecessarily long on small
// clusters and not hammering the servers to hard on large ones. Note that
// server's will soon have CSR rate limiting that will limit the impact on big
// clusters, but a small spread in the initial requests still seems like a good
// idea and limits how many clients will hit the rate limit.
const caChangeInitialSpreadDefault = 20 * time.Second
// caChangeJitterWindow is the time over which we spread each round of retries
// when attempting to get a new certificate following a root rotation. It's
// selected to be a trade-off between not making rotation unnecessarily slow on
// a tiny cluster while not hammering the servers on a huge cluster
// unnecessarily hard. Servers rate limit to protect themselves from the
// expensive crypto work, but in practice have 10k+ RPCs all in the same second
// will cause a major disruption even on large servers due to downloading the
// payloads, parsing msgpack etc. Instead we pick a window that for now is fixed
// but later might be either user configurable (not nice since it would become
// another hard-to-tune value) or set dynamically by the server based on it's
// knowledge of how many certs need to be rotated. Currently the server doesn't
// know that so we pick something that is reasonable. We err on the side of
// being slower that we need in trivial cases but gentler for large deployments.
// 30s means that even with a cluster of 10k service instances, the server only
// has to cope with ~333 RPCs a second which shouldn't be too bad if it's rate
// limiting the actual expensive crypto work.
//
// The actual backoff strategy when we are rate limited is to have each cert
// only retry once with each window of this size, at a point in the window
// selected at random. This performs much better than exponential backoff in
// terms of getting things rotated quickly with more predictable load and so
// fewer rate limited requests. See the full simulation this is based on at
// https://github.com/banks/sim-rate-limit-backoff/blob/master/README.md for
// more detail.
const caChangeJitterWindow = 30 * time.Second

// ConnectCALeaf supports fetching and generating Connect leaf
// certificates.
Expand Down Expand Up @@ -75,6 +92,9 @@ type ConnectCALeaf struct {
// Fetch. Pointers themselves are OK, but if we point to another struct that we
// call a method or modify in some way that would directly mutate the cache and
// cause problems. We'd need to deep-clone in that case in Fetch below.
// time.Time technically contains a pointer to the Location but we ignore that
// since all times we get from our wall clock should point to the same Location
// anyway.
type fetchState struct {
// authorityKeyID is the key ID of the CA root that signed the current cert.
// This is just to save parsing the whole cert everytime we have to check if
Expand All @@ -84,6 +104,18 @@ type fetchState struct {
// forceExpireAfter is used to coordinate renewing certs after a CA rotation
// in a staggered way so that we don't overwhelm the servers.
forceExpireAfter time.Time

// activeRootRotationStart is set when the root has changed and we need to get
// a new cert but haven't got one yet. forceExpireAfter will be set to the
// next scheduled time we should try our CSR, but this is needed to calculate
// the retry windows if we are rate limited when we try. See comment on
// caChangeJitterWindow above for more.
activeRootRotationStart time.Time

// consecutiveRateLimitErrs stores how many rate limit errors we've hit. We
// use this to choose a new window for the next retry. See comment on
// caChangeJitterWindow above for more.
consecutiveRateLimitErrs int
}

// fetchStart is called on each fetch that is about to block and wait for
Expand Down Expand Up @@ -277,22 +309,44 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache
existing, ok = opts.LastResult.Value.(*structs.IssuedCert)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: last value wrong type: %T", req)
"Internal cache failure: last value wrong type: %T", opts.LastResult.Value)
}
state, ok = opts.LastResult.State.(fetchState)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: last state wrong type: %T", req)
if opts.LastResult.State != nil {
state, ok = opts.LastResult.State.(fetchState)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: last state wrong type: %T", opts.LastResult.State)
}
}
} else {
state = fetchState{}
}

// Handle brand new request first as it's simplest.
if existing == nil {
return c.generateNewLeaf(reqReal, &state)
return c.generateNewLeaf(reqReal, result)
}

// Setup result to mirror the current value for if we timeout or hit a rate
// limit. This allows us to update the state (e.g. for backoff or retry
// coordination on root change) even if we don't get a new cert.
result.Value = existing
result.Index = existing.ModifyIndex
result.State = state

// Since state is not a pointer, we can't just set it once in result and then
// continue to update it later since we will be updating only our copy.
// Instead we have a helper function that is used to make sure the state is
// updated in the result when we return.
lastResultWithNewState := func() cache.FetchResult {
return cache.FetchResult{
Value: existing,
Index: existing.ModifyIndex,
State: state,
}
}

// Beyond this point we need to only return lastResultWithNewState() not just
// result since otherwise we might "loose" state updates we expect not to.

// We have a certificate in cache already. Check it's still valid.
now := time.Now()
minExpire, maxExpire := calculateSoftExpiry(now, existing)
Expand All @@ -306,7 +360,7 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache

if expiresAt == now || expiresAt.Before(now) {
// Already expired, just make a new one right away
return c.generateNewLeaf(reqReal, &state)
return c.generateNewLeaf(reqReal, lastResultWithNewState())
}

// We are about to block and wait for a change or timeout.
Expand All @@ -318,16 +372,18 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache
// reload latest CA from cache.
rootUpdateCh := make(chan struct{}, 1)

// The roots may have changed in between blocking calls. We need to verify
// that the existing cert was signed by the current root. If it was we still
// want to do the whole jitter thing. We could code that again here but it's
// identical to the select case below so we just trigger our own update chan
// and let the logic below handle checking if the CA actually changed in the
// common case where it didn't it is a no-op anyway.
rootUpdateCh <- struct{}{}

// Subscribe our chan to get root update notification.
c.fetchStart(rootUpdateCh)
defer c.fetchDone(rootUpdateCh)

// Setup result to mirror the current value for if we timeout. This allows us
// to update the state even if we don't generate a new cert.
result.Value = existing
result.Index = existing.ModifyIndex
result.State = state

// Setup the timeout chan outside the loop so we don't keep bumping the timout
// later if we loop around.
timeoutCh := time.After(opts.Timeout)
Expand All @@ -341,31 +397,35 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache
select {
case <-timeoutCh:
// We timed out the request with same cert.
return result, nil
return lastResultWithNewState(), nil

case <-expiresCh:
// Cert expired or was force-expired by a root change.
return c.generateNewLeaf(reqReal, &state)
return c.generateNewLeaf(reqReal, lastResultWithNewState())

case <-rootUpdateCh:
// A root cache change occurred, reload roots from cache.
roots, err := c.rootsFromCache()
if err != nil {
return result, err
return lastResultWithNewState(), err
}

// Handle _possibly_ changed roots. We still need to verify the new active
// root is not the same as the one our current cert was signed by since we
// can be notified spuriously if we are the first request since the
// rootsWatcher didn't know about the CA we were signed by.
// rootsWatcher didn't know about the CA we were signed by. We also rely
// on this on every request to do the initial check that the current roots
// are the same ones the current cert was signed by.
if activeRootHasKey(roots, state.authorityKeyID) {
// Current active CA is the same one that signed our current cert so
// keep waiting for a change.
continue
}
state.activeRootRotationStart = time.Now()

// CA root changed. We add some jitter here to avoid a thundering herd.
// See docs on caChangeInitialJitter const.
delay := lib.RandomStagger(caChangeInitialSpreadDefault)
// See docs on caChangeJitterWindow const.
delay := lib.RandomStagger(caChangeJitterWindow)
if c.TestOverrideCAChangeInitialDelay > 0 {
delay = c.TestOverrideCAChangeInitialDelay
}
Expand All @@ -374,7 +434,7 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache
// the cache state so the next request will notice we still need to renew
// and do it at the right time. This is cleared once a new cert is
// returned by generateNewLeaf.
state.forceExpireAfter = time.Now().Add(delay)
state.forceExpireAfter = state.activeRootRotationStart.Add(delay)
// If the delay time is within the current timeout, we want to renew the
// as soon as it's up. We change the expire time and chan so that when we
// loop back around, we'll wait at most delay until generating a new cert.
Expand Down Expand Up @@ -416,9 +476,20 @@ func (c *ConnectCALeaf) rootsFromCache() (*structs.IndexedCARoots, error) {
}

// generateNewLeaf does the actual work of creating a new private key,
// generating a CSR and getting it signed by the servers.
func (c *ConnectCALeaf) generateNewLeaf(req *ConnectCALeafRequest, state *fetchState) (cache.FetchResult, error) {
var result cache.FetchResult
// generating a CSR and getting it signed by the servers. result argument
// represents the last result currently in cache if any along with it's state.
func (c *ConnectCALeaf) generateNewLeaf(req *ConnectCALeafRequest,
result cache.FetchResult) (cache.FetchResult, error) {

var state fetchState
if result.State != nil {
var ok bool
state, ok = result.State.(fetchState)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: result state wrong type: %T", result.State)
}
}

// Need to lookup RootCAs response to discover trust domain. This should be a
// cache hit.
Expand Down Expand Up @@ -458,12 +529,55 @@ func (c *ConnectCALeaf) generateNewLeaf(req *ConnectCALeafRequest, state *fetchS
CSR: csr,
}
if err := c.RPC.RPC("ConnectCA.Sign", &args, &reply); err != nil {
if err.Error() == consul.ErrRateLimited.Error() {
if result.Value == nil {
// This was a first fetch - we have no good value in cache. In this case
// we just return the error to the caller rather than rely on surprising
// semi-blocking until the rate limit is appeased or we timeout
// behavior. It's likely the caller isn't expecting this to block since
// it's an initial fetch. This also massively simplifies this edge case.
return result, err
}

if state.activeRootRotationStart.IsZero() {
// We hit a rate limit error by chance - for example a cert expired
// before the root rotation was observed (not triggered by rotation) but
// while server is working through high load from a recent rotation.
// Just pretend there is a rotation and the retry logic here will start
// jittering and retrying in the same way from now.
state.activeRootRotationStart = time.Now()
}

// Increment the errors in the state
state.consecutiveRateLimitErrs++

delay := lib.RandomStagger(caChangeJitterWindow)
if c.TestOverrideCAChangeInitialDelay > 0 {
delay = c.TestOverrideCAChangeInitialDelay
}

// Find the start of the next window we can retry in. See comment on
// caChangeJitterWindow for details of why we use this strategy.
windowStart := state.activeRootRotationStart.Add(
time.Duration(state.consecutiveRateLimitErrs) * delay)

// Pick a random time in that window
state.forceExpireAfter = windowStart.Add(delay)

// Return a result with the existing cert but the new state - the cache
// will see this as no change. Note that we always have an existing result
// here due to the nil value check above.
result.State = state
return result, nil
}
return result, err
}
reply.PrivateKeyPEM = pkPEM

// Reset the forcedExpiry in the state
// Reset rotation state
state.forceExpireAfter = time.Time{}
state.consecutiveRateLimitErrs = 0
state.activeRootRotationStart = time.Time{}

cert, err := connect.ParseCert(reply.CertPEM)
if err != nil {
Expand All @@ -475,7 +589,7 @@ func (c *ConnectCALeaf) generateNewLeaf(req *ConnectCALeafRequest, state *fetchS
result.Value = &reply
// Store value not pointer so we don't accidentally mutate the cache entry
// state in Fetch.
result.State = *state
result.State = state
result.Index = reply.ModifyIndex
return result, nil
}
Expand Down
Loading