From 0de4300e1f65d3bc268a175b1b723544834d1e2e Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Tue, 11 Oct 2016 17:25:24 -0700 Subject: [PATCH] ca: Retry certificate issuance on non-fatal errors When there is no signer, or when using an external CA that's unreachable or not responding properly, add the node to a list of pending certificate requests. Every 10 seconds, run through this list and try to issue the certificates again. Signed-off-by: Aaron Lehmann --- ca/certificates.go | 12 ++++- ca/external.go | 8 +-- ca/server.go | 97 ++++++++++++++++++++++++----------- ca/server_test.go | 32 ++++++++++++ ca/testutils/cautils.go | 1 + ca/testutils/externalutils.go | 17 ++++++ 6 files changed, 133 insertions(+), 34 deletions(-) diff --git a/ca/certificates.go b/ca/certificates.go index a29dbda245..dd4ddd037c 100644 --- a/ca/certificates.go +++ b/ca/certificates.go @@ -69,13 +69,23 @@ const ( MinNodeCertExpiration = 1 * time.Hour ) +// A recoverableErr is an non-fatal error encountered signing a certificate, +// which means that the certificate issuance may be retried at a later time. +type recoverableErr struct { + err error +} + +func (r recoverableErr) Error() string { + return r.err.Error() +} + // ErrNoLocalRootCA is an error type used to indicate that the local root CA // certificate file does not exist. var ErrNoLocalRootCA = errors.New("local root CA certificate does not exist") // ErrNoValidSigner is an error type used to indicate that our RootCA doesn't have the ability to // sign certificates. -var ErrNoValidSigner = errors.New("no valid signer found") +var ErrNoValidSigner = recoverableErr{err: errors.New("no valid signer found")} func init() { cflog.Level = 5 diff --git a/ca/external.go b/ca/external.go index 48dbf81725..e728961c03 100644 --- a/ca/external.go +++ b/ca/external.go @@ -99,23 +99,23 @@ func (eca *ExternalCA) Sign(req signer.SignRequest) (cert []byte, err error) { func makeExternalSignRequest(client *http.Client, url string, csrJSON []byte) (cert []byte, err error) { resp, err := client.Post(url, "application/json", bytes.NewReader(csrJSON)) if err != nil { - return nil, errors.Wrap(err, "unable to perform certificate signing request") + return nil, recoverableErr{err: errors.Wrap(err, "unable to perform certificate signing request")} } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { - return nil, errors.Wrap(err, "unable to read CSR response body") + return nil, recoverableErr{err: errors.Wrap(err, "unable to read CSR response body")} } if resp.StatusCode != http.StatusOK { - return nil, errors.Errorf("unexpected status code in CSR response: %d - %s", resp.StatusCode, string(body)) + return nil, recoverableErr{err: errors.Errorf("unexpected status code in CSR response: %d - %s", resp.StatusCode, string(body))} } var apiResponse api.Response if err := json.Unmarshal(body, &apiResponse); err != nil { log.Debugf("unable to JSON-parse CFSSL API response body: %s", string(body)) - return nil, errors.Wrap(err, "unable to parse JSON response") + return nil, recoverableErr{err: errors.Wrap(err, "unable to parse JSON response")} } if !apiResponse.Success || apiResponse.Result == nil { diff --git a/ca/server.go b/ca/server.go index ebd3047ba2..4398783545 100644 --- a/ca/server.go +++ b/ca/server.go @@ -3,6 +3,7 @@ package ca import ( "crypto/subtle" "sync" + "time" "github.com/Sirupsen/logrus" "github.com/docker/swarmkit/api" @@ -17,18 +18,27 @@ import ( "google.golang.org/grpc/codes" ) +const ( + defaultReconciliationRetryInterval = 10 * time.Second +) + // Server is the CA and NodeCA API gRPC server. -// TODO(diogo): At some point we may want to have separate implementations of +// TODO(aaronl): At some point we may want to have separate implementations of // CA, NodeCA, and other hypothetical future CA services. At the moment, // breaking it apart doesn't seem worth it. type Server struct { - mu sync.Mutex - wg sync.WaitGroup - ctx context.Context - cancel func() - store *store.MemoryStore - securityConfig *SecurityConfig - joinTokens *api.JoinTokens + mu sync.Mutex + wg sync.WaitGroup + ctx context.Context + cancel func() + store *store.MemoryStore + securityConfig *SecurityConfig + joinTokens *api.JoinTokens + reconciliationRetryInterval time.Duration + + // pending is a map of nodes with pending certificates issuance or + // renewal. They are indexed by node ID. + pending map[string]*api.Node // Started is a channel which gets closed once the server is running // and able to service RPCs. @@ -45,12 +55,20 @@ func DefaultCAConfig() api.CAConfig { // NewServer creates a CA API server. func NewServer(store *store.MemoryStore, securityConfig *SecurityConfig) *Server { return &Server{ - store: store, - securityConfig: securityConfig, - started: make(chan struct{}), + store: store, + securityConfig: securityConfig, + pending: make(map[string]*api.Node), + started: make(chan struct{}), + reconciliationRetryInterval: defaultReconciliationRetryInterval, } } +// SetReconciliationRetryInterval changes the time interval between +// reconciliation attempts. This function must be called before Run. +func (s *Server) SetReconciliationRetryInterval(reconciliationRetryInterval time.Duration) { + s.reconciliationRetryInterval = reconciliationRetryInterval +} + // NodeCertificateStatus returns the current issuance status of an issuance request identified by the nodeID func (s *Server) NodeCertificateStatus(ctx context.Context, request *api.NodeCertificateStatusRequest) (*api.NodeCertificateStatusResponse, error) { if request.NodeID == "" { @@ -240,7 +258,6 @@ func (s *Server) issueRenewCertificate(ctx context.Context, nodeID string, csr [ node *api.Node ) err := s.store.Update(func(tx store.Tx) error { - // Attempt to retrieve the node with nodeID node = store.GetNode(tx, nodeID) if node == nil { @@ -356,6 +373,9 @@ func (s *Server) Run(ctx context.Context) error { }).WithError(err).Errorf("error attempting to reconcile certificates") } + ticker := time.NewTicker(s.reconciliationRetryInterval) + defer ticker.Stop() + // Watch for new nodes being created, new nodes being updated, and changes // to the cluster for { @@ -373,7 +393,16 @@ func (s *Server) Run(ctx context.Context) error { case state.EventUpdateCluster: s.updateCluster(ctx, v.Cluster) } - + case <-ticker.C: + for _, node := range s.pending { + if err := s.evaluateAndSignNodeCert(ctx, node); err != nil { + // If this sign operation did not succeed, the rest are + // unlikely to. Yield so that we don't hammer an external CA. + // Since the map iteration order is randomized, there is no + // risk of getting stuck on a problematic CSR. + break + } + } case <-ctx.Done(): return ctx.Err() case <-s.ctx.Done(): @@ -493,28 +522,29 @@ func (s *Server) updateCluster(ctx context.Context, cluster *api.Cluster) { } // evaluateAndSignNodeCert implements the logic of which certificates to sign -func (s *Server) evaluateAndSignNodeCert(ctx context.Context, node *api.Node) { +func (s *Server) evaluateAndSignNodeCert(ctx context.Context, node *api.Node) error { // If the desired membership and actual state are in sync, there's // nothing to do. if node.Spec.Membership == api.NodeMembershipAccepted && node.Certificate.Status.State == api.IssuanceStateIssued { - return + return nil } // If the certificate state is renew, then it is a server-sided accepted cert (cert renewals) if node.Certificate.Status.State == api.IssuanceStateRenew { - s.signNodeCert(ctx, node) - return + return s.signNodeCert(ctx, node) } // Sign this certificate if a user explicitly changed it to Accepted, and // the certificate is in pending state if node.Spec.Membership == api.NodeMembershipAccepted && node.Certificate.Status.State == api.IssuanceStatePending { - s.signNodeCert(ctx, node) + return s.signNodeCert(ctx, node) } + + return nil } // signNodeCert does the bulk of the work for signing a certificate -func (s *Server) signNodeCert(ctx context.Context, node *api.Node) { +func (s *Server) signNodeCert(ctx context.Context, node *api.Node) error { rootCA := s.securityConfig.RootCA() externalCA := s.securityConfig.externalCA @@ -527,9 +557,11 @@ func (s *Server) signNodeCert(ctx context.Context, node *api.Node) { "node.id": node.ID, "method": "(*Server).signNodeCert", }).WithError(err).Errorf("failed to parse role") - return + return errors.New("failed to parse role") } + s.pending[node.ID] = node + // Attempt to sign the CSR var ( rawCSR = node.Certificate.CSR @@ -550,16 +582,19 @@ func (s *Server) signNodeCert(ctx context.Context, node *api.Node) { "node.id": node.ID, "method": "(*Server).signNodeCert", }).WithError(err).Errorf("failed to sign CSR") - // If this error is due the lack of signer, maybe some other - // manager in the future will pick it up. Return without - // changing the state of the certificate. - if err == ErrNoValidSigner { - return - } + // If the current state is already Failed, no need to change it if node.Certificate.Status.State == api.IssuanceStateFailed { - return + delete(s.pending, node.ID) + return errors.New("failed to sign CSR") } + + if _, ok := err.(recoverableErr); ok { + // Return without changing the state of the certificate. We may + // retry signing it in the future. + return errors.New("failed to sign CSR") + } + // We failed to sign this CSR, change the state to FAILED err = s.store.Update(func(tx store.Tx) error { node := store.GetNode(tx, nodeID) @@ -580,7 +615,9 @@ func (s *Server) signNodeCert(ctx context.Context, node *api.Node) { "method": "(*Server).signNodeCert", }).WithError(err).Errorf("transaction failed when setting state to FAILED") } - return + + delete(s.pending, node.ID) + return errors.New("failed to sign CSR") } // We were able to successfully sign the new CSR. Let's try to update the nodeStore @@ -606,6 +643,7 @@ func (s *Server) signNodeCert(ctx context.Context, node *api.Node) { "node.role": node.Certificate.Role, "method": "(*Server).signNodeCert", }).Debugf("certificate issued") + delete(s.pending, node.ID) break } if err == store.ErrSequenceConflict { @@ -616,8 +654,9 @@ func (s *Server) signNodeCert(ctx context.Context, node *api.Node) { "node.id": nodeID, "method": "(*Server).signNodeCert", }).WithError(err).Errorf("transaction failed") - return + return errors.New("transaction failed") } + return nil } // reconcileNodeCertificates is a helper method that calles evaluateAndSignNodeCert on all the diff --git a/ca/server_test.go b/ca/server_test.go index 857a437868..c460fe1010 100644 --- a/ca/server_test.go +++ b/ca/server_test.go @@ -60,6 +60,38 @@ func TestIssueNodeCertificate(t *testing.T) { assert.Equal(t, api.NodeRoleWorker, statusResponse.Certificate.Role) } +func TestIssueNodeCertificateBrokenCA(t *testing.T) { + if !testutils.External { + t.Skip("test only applicable for external CA configuration") + } + + tc := testutils.NewTestCA(t) + defer tc.Stop() + + csr, _, err := ca.GenerateAndWriteNewKey(tc.Paths.Node) + assert.NoError(t, err) + + tc.ExternalSigningServer.Flake() + + go func() { + time.Sleep(250 * time.Millisecond) + tc.ExternalSigningServer.Deflake() + }() + issueRequest := &api.IssueNodeCertificateRequest{CSR: csr, Token: tc.WorkerToken} + issueResponse, err := tc.NodeCAClients[0].IssueNodeCertificate(context.Background(), issueRequest) + assert.NoError(t, err) + assert.NotNil(t, issueResponse.NodeID) + assert.Equal(t, api.NodeMembershipAccepted, issueResponse.NodeMembership) + + statusRequest := &api.NodeCertificateStatusRequest{NodeID: issueResponse.NodeID} + statusResponse, err := tc.NodeCAClients[0].NodeCertificateStatus(context.Background(), statusRequest) + require.NoError(t, err) + assert.Equal(t, api.IssuanceStateIssued, statusResponse.Status.State) + assert.NotNil(t, statusResponse.Certificate.Certificate) + assert.Equal(t, api.NodeRoleWorker, statusResponse.Certificate.Role) + +} + func TestIssueNodeCertificateWithInvalidCSR(t *testing.T) { tc := testutils.NewTestCA(t) defer tc.Stop() diff --git a/ca/testutils/cautils.go b/ca/testutils/cautils.go index b2c8175094..089c7f6447 100644 --- a/ca/testutils/cautils.go +++ b/ca/testutils/cautils.go @@ -161,6 +161,7 @@ func NewTestCA(t *testing.T) *TestCA { createClusterObject(t, s, organization, workerToken, managerToken, externalCAs...) caServer := ca.NewServer(s, managerConfig) + caServer.SetReconciliationRetryInterval(50 * time.Millisecond) api.RegisterCAServer(grpcServer, caServer) api.RegisterNodeCAServer(grpcServer, caServer) diff --git a/ca/testutils/externalutils.go b/ca/testutils/externalutils.go index 58198a9ff6..6b91bfd60b 100644 --- a/ca/testutils/externalutils.go +++ b/ca/testutils/externalutils.go @@ -67,6 +67,7 @@ func NewExternalSigningServer(rootCA ca.RootCA, basedir string) (*ExternalSignin handler := &signHandler{ numIssued: &ess.NumIssued, rootCA: rootCA, + flaky: &ess.flaky, } mux.Handle(signURL.Path, handler) @@ -85,6 +86,7 @@ type ExternalSigningServer struct { listener net.Listener NumIssued uint64 URL string + flaky uint32 } // Stop stops this signing server by closing the underlying TCP/TLS listener. @@ -92,12 +94,27 @@ func (ess *ExternalSigningServer) Stop() error { return ess.listener.Close() } +// Flake makes the signing server return HTTP 500 errors. +func (ess *ExternalSigningServer) Flake() { + atomic.StoreUint32(&ess.flaky, 1) +} + +// Deflake restores normal operation after a call to Flake. +func (ess *ExternalSigningServer) Deflake() { + atomic.StoreUint32(&ess.flaky, 0) +} + type signHandler struct { numIssued *uint64 rootCA ca.RootCA + flaky *uint32 } func (h *signHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if atomic.LoadUint32(h.flaky) == 1 { + w.WriteHeader(http.StatusInternalServerError) + } + // Check client authentication via mutual TLS. if r.TLS == nil || len(r.TLS.PeerCertificates) == 0 { cfsslErr := cfsslerrors.New(cfsslerrors.APIClientError, cfsslerrors.AuthenticationFailure)