Skip to content

Commit

Permalink
Merge pull request #1618 from aaronlehmann/external-ca-reconcilation
Browse files Browse the repository at this point in the history
ca: Retry certificate issuance on non-fatal errors
  • Loading branch information
diogomonica authored Oct 18, 2016
2 parents a3e5722 + 0de4300 commit ee5b04e
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 34 deletions.
12 changes: 11 additions & 1 deletion ca/certificates.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,23 @@ const (
MinNodeCertExpiration = 1 * time.Hour
)

// A recoverableErr is an non-fatal error encountered signing a certificate,
// which means that the certificate issuance may be retried at a later time.
type recoverableErr struct {
err error
}

func (r recoverableErr) Error() string {
return r.err.Error()
}

// ErrNoLocalRootCA is an error type used to indicate that the local root CA
// certificate file does not exist.
var ErrNoLocalRootCA = errors.New("local root CA certificate does not exist")

// ErrNoValidSigner is an error type used to indicate that our RootCA doesn't have the ability to
// sign certificates.
var ErrNoValidSigner = errors.New("no valid signer found")
var ErrNoValidSigner = recoverableErr{err: errors.New("no valid signer found")}

func init() {
cflog.Level = 5
Expand Down
8 changes: 4 additions & 4 deletions ca/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,23 +99,23 @@ func (eca *ExternalCA) Sign(req signer.SignRequest) (cert []byte, err error) {
func makeExternalSignRequest(client *http.Client, url string, csrJSON []byte) (cert []byte, err error) {
resp, err := client.Post(url, "application/json", bytes.NewReader(csrJSON))
if err != nil {
return nil, errors.Wrap(err, "unable to perform certificate signing request")
return nil, recoverableErr{err: errors.Wrap(err, "unable to perform certificate signing request")}
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, errors.Wrap(err, "unable to read CSR response body")
return nil, recoverableErr{err: errors.Wrap(err, "unable to read CSR response body")}
}

if resp.StatusCode != http.StatusOK {
return nil, errors.Errorf("unexpected status code in CSR response: %d - %s", resp.StatusCode, string(body))
return nil, recoverableErr{err: errors.Errorf("unexpected status code in CSR response: %d - %s", resp.StatusCode, string(body))}
}

var apiResponse api.Response
if err := json.Unmarshal(body, &apiResponse); err != nil {
log.Debugf("unable to JSON-parse CFSSL API response body: %s", string(body))
return nil, errors.Wrap(err, "unable to parse JSON response")
return nil, recoverableErr{err: errors.Wrap(err, "unable to parse JSON response")}
}

if !apiResponse.Success || apiResponse.Result == nil {
Expand Down
97 changes: 68 additions & 29 deletions ca/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ca
import (
"crypto/subtle"
"sync"
"time"

"github.com/Sirupsen/logrus"
"github.com/docker/swarmkit/api"
Expand All @@ -17,18 +18,27 @@ import (
"google.golang.org/grpc/codes"
)

const (
defaultReconciliationRetryInterval = 10 * time.Second
)

// Server is the CA and NodeCA API gRPC server.
// TODO(diogo): At some point we may want to have separate implementations of
// TODO(aaronl): At some point we may want to have separate implementations of
// CA, NodeCA, and other hypothetical future CA services. At the moment,
// breaking it apart doesn't seem worth it.
type Server struct {
mu sync.Mutex
wg sync.WaitGroup
ctx context.Context
cancel func()
store *store.MemoryStore
securityConfig *SecurityConfig
joinTokens *api.JoinTokens
mu sync.Mutex
wg sync.WaitGroup
ctx context.Context
cancel func()
store *store.MemoryStore
securityConfig *SecurityConfig
joinTokens *api.JoinTokens
reconciliationRetryInterval time.Duration

// pending is a map of nodes with pending certificates issuance or
// renewal. They are indexed by node ID.
pending map[string]*api.Node

// Started is a channel which gets closed once the server is running
// and able to service RPCs.
Expand All @@ -45,12 +55,20 @@ func DefaultCAConfig() api.CAConfig {
// NewServer creates a CA API server.
func NewServer(store *store.MemoryStore, securityConfig *SecurityConfig) *Server {
return &Server{
store: store,
securityConfig: securityConfig,
started: make(chan struct{}),
store: store,
securityConfig: securityConfig,
pending: make(map[string]*api.Node),
started: make(chan struct{}),
reconciliationRetryInterval: defaultReconciliationRetryInterval,
}
}

// SetReconciliationRetryInterval changes the time interval between
// reconciliation attempts. This function must be called before Run.
func (s *Server) SetReconciliationRetryInterval(reconciliationRetryInterval time.Duration) {
s.reconciliationRetryInterval = reconciliationRetryInterval
}

// NodeCertificateStatus returns the current issuance status of an issuance request identified by the nodeID
func (s *Server) NodeCertificateStatus(ctx context.Context, request *api.NodeCertificateStatusRequest) (*api.NodeCertificateStatusResponse, error) {
if request.NodeID == "" {
Expand Down Expand Up @@ -240,7 +258,6 @@ func (s *Server) issueRenewCertificate(ctx context.Context, nodeID string, csr [
node *api.Node
)
err := s.store.Update(func(tx store.Tx) error {

// Attempt to retrieve the node with nodeID
node = store.GetNode(tx, nodeID)
if node == nil {
Expand Down Expand Up @@ -356,6 +373,9 @@ func (s *Server) Run(ctx context.Context) error {
}).WithError(err).Errorf("error attempting to reconcile certificates")
}

ticker := time.NewTicker(s.reconciliationRetryInterval)
defer ticker.Stop()

// Watch for new nodes being created, new nodes being updated, and changes
// to the cluster
for {
Expand All @@ -373,7 +393,16 @@ func (s *Server) Run(ctx context.Context) error {
case state.EventUpdateCluster:
s.updateCluster(ctx, v.Cluster)
}

case <-ticker.C:
for _, node := range s.pending {
if err := s.evaluateAndSignNodeCert(ctx, node); err != nil {
// If this sign operation did not succeed, the rest are
// unlikely to. Yield so that we don't hammer an external CA.
// Since the map iteration order is randomized, there is no
// risk of getting stuck on a problematic CSR.
break
}
}
case <-ctx.Done():
return ctx.Err()
case <-s.ctx.Done():
Expand Down Expand Up @@ -493,28 +522,29 @@ func (s *Server) updateCluster(ctx context.Context, cluster *api.Cluster) {
}

// evaluateAndSignNodeCert implements the logic of which certificates to sign
func (s *Server) evaluateAndSignNodeCert(ctx context.Context, node *api.Node) {
func (s *Server) evaluateAndSignNodeCert(ctx context.Context, node *api.Node) error {
// If the desired membership and actual state are in sync, there's
// nothing to do.
if node.Spec.Membership == api.NodeMembershipAccepted && node.Certificate.Status.State == api.IssuanceStateIssued {
return
return nil
}

// If the certificate state is renew, then it is a server-sided accepted cert (cert renewals)
if node.Certificate.Status.State == api.IssuanceStateRenew {
s.signNodeCert(ctx, node)
return
return s.signNodeCert(ctx, node)
}

// Sign this certificate if a user explicitly changed it to Accepted, and
// the certificate is in pending state
if node.Spec.Membership == api.NodeMembershipAccepted && node.Certificate.Status.State == api.IssuanceStatePending {
s.signNodeCert(ctx, node)
return s.signNodeCert(ctx, node)
}

return nil
}

// signNodeCert does the bulk of the work for signing a certificate
func (s *Server) signNodeCert(ctx context.Context, node *api.Node) {
func (s *Server) signNodeCert(ctx context.Context, node *api.Node) error {
rootCA := s.securityConfig.RootCA()
externalCA := s.securityConfig.externalCA

Expand All @@ -527,9 +557,11 @@ func (s *Server) signNodeCert(ctx context.Context, node *api.Node) {
"node.id": node.ID,
"method": "(*Server).signNodeCert",
}).WithError(err).Errorf("failed to parse role")
return
return errors.New("failed to parse role")
}

s.pending[node.ID] = node

// Attempt to sign the CSR
var (
rawCSR = node.Certificate.CSR
Expand All @@ -550,16 +582,19 @@ func (s *Server) signNodeCert(ctx context.Context, node *api.Node) {
"node.id": node.ID,
"method": "(*Server).signNodeCert",
}).WithError(err).Errorf("failed to sign CSR")
// If this error is due the lack of signer, maybe some other
// manager in the future will pick it up. Return without
// changing the state of the certificate.
if err == ErrNoValidSigner {
return
}

// If the current state is already Failed, no need to change it
if node.Certificate.Status.State == api.IssuanceStateFailed {
return
delete(s.pending, node.ID)
return errors.New("failed to sign CSR")
}

if _, ok := err.(recoverableErr); ok {
// Return without changing the state of the certificate. We may
// retry signing it in the future.
return errors.New("failed to sign CSR")
}

// We failed to sign this CSR, change the state to FAILED
err = s.store.Update(func(tx store.Tx) error {
node := store.GetNode(tx, nodeID)
Expand All @@ -580,7 +615,9 @@ func (s *Server) signNodeCert(ctx context.Context, node *api.Node) {
"method": "(*Server).signNodeCert",
}).WithError(err).Errorf("transaction failed when setting state to FAILED")
}
return

delete(s.pending, node.ID)
return errors.New("failed to sign CSR")
}

// We were able to successfully sign the new CSR. Let's try to update the nodeStore
Expand All @@ -606,6 +643,7 @@ func (s *Server) signNodeCert(ctx context.Context, node *api.Node) {
"node.role": node.Certificate.Role,
"method": "(*Server).signNodeCert",
}).Debugf("certificate issued")
delete(s.pending, node.ID)
break
}
if err == store.ErrSequenceConflict {
Expand All @@ -616,8 +654,9 @@ func (s *Server) signNodeCert(ctx context.Context, node *api.Node) {
"node.id": nodeID,
"method": "(*Server).signNodeCert",
}).WithError(err).Errorf("transaction failed")
return
return errors.New("transaction failed")
}
return nil
}

// reconcileNodeCertificates is a helper method that calles evaluateAndSignNodeCert on all the
Expand Down
32 changes: 32 additions & 0 deletions ca/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,38 @@ func TestIssueNodeCertificate(t *testing.T) {
assert.Equal(t, api.NodeRoleWorker, statusResponse.Certificate.Role)
}

func TestIssueNodeCertificateBrokenCA(t *testing.T) {
if !testutils.External {
t.Skip("test only applicable for external CA configuration")
}

tc := testutils.NewTestCA(t)
defer tc.Stop()

csr, _, err := ca.GenerateAndWriteNewKey(tc.Paths.Node)
assert.NoError(t, err)

tc.ExternalSigningServer.Flake()

go func() {
time.Sleep(250 * time.Millisecond)
tc.ExternalSigningServer.Deflake()
}()
issueRequest := &api.IssueNodeCertificateRequest{CSR: csr, Token: tc.WorkerToken}
issueResponse, err := tc.NodeCAClients[0].IssueNodeCertificate(context.Background(), issueRequest)
assert.NoError(t, err)
assert.NotNil(t, issueResponse.NodeID)
assert.Equal(t, api.NodeMembershipAccepted, issueResponse.NodeMembership)

statusRequest := &api.NodeCertificateStatusRequest{NodeID: issueResponse.NodeID}
statusResponse, err := tc.NodeCAClients[0].NodeCertificateStatus(context.Background(), statusRequest)
require.NoError(t, err)
assert.Equal(t, api.IssuanceStateIssued, statusResponse.Status.State)
assert.NotNil(t, statusResponse.Certificate.Certificate)
assert.Equal(t, api.NodeRoleWorker, statusResponse.Certificate.Role)

}

func TestIssueNodeCertificateWithInvalidCSR(t *testing.T) {
tc := testutils.NewTestCA(t)
defer tc.Stop()
Expand Down
1 change: 1 addition & 0 deletions ca/testutils/cautils.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func NewTestCA(t *testing.T) *TestCA {

createClusterObject(t, s, organization, workerToken, managerToken, externalCAs...)
caServer := ca.NewServer(s, managerConfig)
caServer.SetReconciliationRetryInterval(50 * time.Millisecond)
api.RegisterCAServer(grpcServer, caServer)
api.RegisterNodeCAServer(grpcServer, caServer)

Expand Down
17 changes: 17 additions & 0 deletions ca/testutils/externalutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func NewExternalSigningServer(rootCA ca.RootCA, basedir string) (*ExternalSignin
handler := &signHandler{
numIssued: &ess.NumIssued,
rootCA: rootCA,
flaky: &ess.flaky,
}
mux.Handle(signURL.Path, handler)

Expand All @@ -85,19 +86,35 @@ type ExternalSigningServer struct {
listener net.Listener
NumIssued uint64
URL string
flaky uint32
}

// Stop stops this signing server by closing the underlying TCP/TLS listener.
func (ess *ExternalSigningServer) Stop() error {
return ess.listener.Close()
}

// Flake makes the signing server return HTTP 500 errors.
func (ess *ExternalSigningServer) Flake() {
atomic.StoreUint32(&ess.flaky, 1)
}

// Deflake restores normal operation after a call to Flake.
func (ess *ExternalSigningServer) Deflake() {
atomic.StoreUint32(&ess.flaky, 0)
}

type signHandler struct {
numIssued *uint64
rootCA ca.RootCA
flaky *uint32
}

func (h *signHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if atomic.LoadUint32(h.flaky) == 1 {
w.WriteHeader(http.StatusInternalServerError)
}

// Check client authentication via mutual TLS.
if r.TLS == nil || len(r.TLS.PeerCertificates) == 0 {
cfsslErr := cfsslerrors.New(cfsslerrors.APIClientError, cfsslerrors.AuthenticationFailure)
Expand Down

0 comments on commit ee5b04e

Please sign in to comment.