Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert code to use connectionbroker package #1851

Merged
merged 1 commit into from
Jan 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,12 +336,10 @@ func (a *Agent) handleSessionMessage(ctx context.Context, message *api.SessionMe
seen := map[api.Peer]struct{}{}
for _, manager := range message.Managers {
if manager.Peer.Addr == "" {
log.G(ctx).WithField("manager.addr", manager.Peer.Addr).
Warnf("skipping bad manager address")
continue
}

a.config.Managers.Observe(*manager.Peer, int(manager.Weight))
a.config.ConnBroker.Remotes().Observe(*manager.Peer, int(manager.Weight))
seen[*manager.Peer] = struct{}{}
}

Expand All @@ -358,9 +356,9 @@ func (a *Agent) handleSessionMessage(ctx context.Context, message *api.SessionMe
}

// prune managers not in list.
for peer := range a.config.Managers.Weights() {
for peer := range a.config.ConnBroker.Remotes().Weights() {
if _, ok := seen[peer]; !ok {
a.config.Managers.Remove(peer)
a.config.ConnBroker.Remotes().Remove(peer)
}
}

Expand Down Expand Up @@ -468,7 +466,7 @@ func (a *Agent) Publisher(ctx context.Context, subscriptionID string) (exec.LogP
)

err = a.withSession(ctx, func(session *session) error {
publisher, err = api.NewLogBrokerClient(session.conn).PublishLogs(ctx)
publisher, err = api.NewLogBrokerClient(session.conn.ClientConn).PublishLogs(ctx)
return err
})
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/ca"
"github.com/docker/swarmkit/ca/testutils"
"github.com/docker/swarmkit/connectionbroker"
"github.com/docker/swarmkit/remotes"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -73,7 +74,7 @@ func TestAgentStartStop(t *testing.T) {

agent, err := New(&Config{
Executor: &NoopExecutor{},
Managers: remotes,
ConnBroker: connectionbroker.New(remotes),
Credentials: agentSecurityConfig.ClientTLSCreds,
DB: db,
})
Expand Down Expand Up @@ -147,7 +148,7 @@ func agentTestEnv(t *testing.T) (*Agent, func()) {

agent, err := New(&Config{
Executor: &NoopExecutor{},
Managers: remotes,
ConnBroker: connectionbroker.New(remotes),
Credentials: agentSecurityConfig.ClientTLSCreds,
DB: db,
})
Expand Down
8 changes: 4 additions & 4 deletions agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"github.com/boltdb/bolt"
"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/remotes"
"github.com/docker/swarmkit/connectionbroker"
"github.com/pkg/errors"
"google.golang.org/grpc/credentials"
)
Expand All @@ -14,9 +14,9 @@ type Config struct {
// Hostname the name of host for agent instance.
Hostname string

// Managers provides the manager backend used by the agent. It will be
// updated with managers weights as observed by the agent.
Managers remotes.Remotes
// ConnBroker provides a connection broker for retrieving gRPC
// connections to managers.
ConnBroker *connectionbroker.Broker

// Executor specifies the executor to use for the agent.
Executor exec.Executor
Expand Down
4 changes: 2 additions & 2 deletions agent/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type ResourceAllocator interface {
func (r *resourceAllocator) AttachNetwork(ctx context.Context, id, target string, addresses []string) (string, error) {
var taskID string
if err := r.agent.withSession(ctx, func(session *session) error {
client := api.NewResourceAllocatorClient(session.conn)
client := api.NewResourceAllocatorClient(session.conn.ClientConn)
r, err := client.AttachNetwork(ctx, &api.AttachNetworkRequest{
Config: &api.NetworkAttachmentConfig{
Target: target,
Expand All @@ -53,7 +53,7 @@ func (r *resourceAllocator) AttachNetwork(ctx context.Context, id, target string
// DetachNetwork deletes a network attachment.
func (r *resourceAllocator) DetachNetwork(ctx context.Context, aID string) error {
return r.agent.withSession(ctx, func(session *session) error {
client := api.NewResourceAllocatorClient(session.conn)
client := api.NewResourceAllocatorClient(session.conn.ClientConn)
_, err := client.DetachNetwork(ctx, &api.DetachNetworkRequest{
AttachmentID: aID,
})
Expand Down
28 changes: 10 additions & 18 deletions agent/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (

"github.com/Sirupsen/logrus"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/connectionbroker"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/protobuf/ptypes"
"github.com/docker/swarmkit/remotes"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand All @@ -30,8 +30,7 @@ var (
// flow into the agent, such as task assignment, are called back into the
// agent through errs, messages and tasks.
type session struct {
conn *grpc.ClientConn
addr string
conn *connectionbroker.Conn

agent *Agent
sessionID string
Expand Down Expand Up @@ -61,20 +60,14 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI
// TODO(stevvooe): Need to move connection management up a level or create
// independent connection for log broker client.

peer, err := agent.config.Managers.Select()
if err != nil {
s.errs <- err
return s
}
cc, err := grpc.Dial(peer.Addr,
cc, err := agent.config.ConnBroker.Select(
grpc.WithTransportCredentials(agent.config.Credentials),
grpc.WithTimeout(dispatcherRPCTimeout),
)
if err != nil {
s.errs <- err
return s
}
s.addr = peer.Addr
s.conn = cc

go s.run(ctx, delay, description)
Expand Down Expand Up @@ -127,7 +120,7 @@ func (s *session) start(ctx context.Context, description *api.NodeDescription) e
// Need to run Session in a goroutine since there's no way to set a
// timeout for an individual Recv call in a stream.
go func() {
client := api.NewDispatcherClient(s.conn)
client := api.NewDispatcherClient(s.conn.ClientConn)

stream, err = client.Session(sessionCtx, &api.SessionRequest{
Description: description,
Expand Down Expand Up @@ -160,7 +153,7 @@ func (s *session) start(ctx context.Context, description *api.NodeDescription) e

func (s *session) heartbeat(ctx context.Context) error {
log.G(ctx).Debugf("(*session).heartbeat")
client := api.NewDispatcherClient(s.conn)
client := api.NewDispatcherClient(s.conn.ClientConn)
heartbeat := time.NewTimer(1) // send out a heartbeat right away
defer heartbeat.Stop()

Expand Down Expand Up @@ -224,7 +217,7 @@ func (s *session) logSubscriptions(ctx context.Context) error {
log := log.G(ctx).WithFields(logrus.Fields{"method": "(*session).logSubscriptions"})
log.Debugf("")

client := api.NewLogBrokerClient(s.conn)
client := api.NewLogBrokerClient(s.conn.ClientConn)
subscriptions, err := client.ListenSubscriptions(ctx, &api.ListenSubscriptionsRequest{})
if err != nil {
return err
Expand Down Expand Up @@ -269,7 +262,7 @@ func (s *session) watch(ctx context.Context) error {
err error
)

client := api.NewDispatcherClient(s.conn)
client := api.NewDispatcherClient(s.conn.ClientConn)
for {
// If this is the first time we're running the loop, or there was a reference mismatch
// attempt to get the assignmentWatch
Expand Down Expand Up @@ -344,7 +337,7 @@ func (s *session) watch(ctx context.Context) error {

// sendTaskStatus uses the current session to send the status of a single task.
func (s *session) sendTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error {
client := api.NewDispatcherClient(s.conn)
client := api.NewDispatcherClient(s.conn.ClientConn)
if _, err := client.UpdateTaskStatus(ctx, &api.UpdateTaskStatusRequest{
SessionID: s.sessionID,
Updates: []*api.UpdateTaskStatusRequest_TaskStatusUpdate{
Expand Down Expand Up @@ -385,7 +378,7 @@ func (s *session) sendTaskStatuses(ctx context.Context, updates ...*api.UpdateTa
return updates, ctx.Err()
}

client := api.NewDispatcherClient(s.conn)
client := api.NewDispatcherClient(s.conn.ClientConn)
n := batchSize

if len(updates) < n {
Expand Down Expand Up @@ -416,8 +409,7 @@ func (s *session) sendError(err error) {
func (s *session) close() error {
s.closeOnce.Do(func() {
if s.conn != nil {
s.agent.config.Managers.ObserveIfExists(api.Peer{Addr: s.addr}, -remotes.DefaultObservationWeight)
s.conn.Close()
s.conn.Close(false)
}

close(s.closed)
Expand Down
72 changes: 36 additions & 36 deletions ca/certificates.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
"github.com/cloudflare/cfssl/signer/local"
"github.com/docker/go-events"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/connectionbroker"
"github.com/docker/swarmkit/ioutils"
"github.com/docker/swarmkit/remotes"
"github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"golang.org/x/net/context"
Expand Down Expand Up @@ -169,6 +169,15 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit
if err == nil {
break
}

// If the first attempt fails, we should try a remote
// connection. The local node may be a manager that was
// demoted, so the local connection (which is preferred) may
// not work. If we are successful in renewing the certificate,
// the local connection will not be returned by the connection
// broker anymore.
config.ForceRemote = true

}
if err != nil {
return nil, err
Expand Down Expand Up @@ -202,7 +211,7 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit

var kekUpdate *KEKData
for i := 0; i < 5; i++ {
kekUpdate, err = rca.getKEKUpdate(ctx, X509Cert, tlsKeyPair, config.Remotes)
kekUpdate, err = rca.getKEKUpdate(ctx, X509Cert, tlsKeyPair, config.ConnBroker)
if err == nil {
break
}
Expand All @@ -218,7 +227,7 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit
return &tlsKeyPair, nil
}

func (rca *RootCA) getKEKUpdate(ctx context.Context, cert *x509.Certificate, keypair tls.Certificate, r remotes.Remotes) (*KEKData, error) {
func (rca *RootCA) getKEKUpdate(ctx context.Context, cert *x509.Certificate, keypair tls.Certificate, connBroker *connectionbroker.Broker) (*KEKData, error) {
var managerRole bool
for _, ou := range cert.Subject.OrganizationalUnit {
if ou == ManagerRole {
Expand All @@ -229,25 +238,25 @@ func (rca *RootCA) getKEKUpdate(ctx context.Context, cert *x509.Certificate, key

if managerRole {
mtlsCreds := credentials.NewTLS(&tls.Config{ServerName: CARole, RootCAs: rca.Pool, Certificates: []tls.Certificate{keypair}})
conn, peer, err := getGRPCConnection(mtlsCreds, r)
conn, err := getGRPCConnection(mtlsCreds, connBroker, false)
if err != nil {
return nil, err
}
defer conn.Close()

client := api.NewCAClient(conn)
client := api.NewCAClient(conn.ClientConn)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
response, err := client.GetUnlockKey(ctx, &api.GetUnlockKeyRequest{})
if err != nil {
if grpc.Code(err) == codes.Unimplemented { // if the server does not support keks, return as if no encryption key was specified
conn.Close(true)
return &KEKData{}, nil
}

r.Observe(peer, -remotes.DefaultObservationWeight)
conn.Close(false)
return nil, err
}
r.Observe(peer, remotes.DefaultObservationWeight)
conn.Close(true)
return &KEKData{KEK: response.UnlockKey, Version: response.Version.Index}, nil
}

Expand Down Expand Up @@ -440,45 +449,33 @@ func GetLocalRootCA(paths CertPaths) (RootCA, error) {
return NewRootCA(cert, key, DefaultNodeCertExpiration)
}

func getGRPCConnection(creds credentials.TransportCredentials, r remotes.Remotes) (*grpc.ClientConn, api.Peer, error) {
peer, err := r.Select()
if err != nil {
return nil, api.Peer{}, err
}

opts := []grpc.DialOption{
func getGRPCConnection(creds credentials.TransportCredentials, connBroker *connectionbroker.Broker, forceRemote bool) (*connectionbroker.Conn, error) {
dialOpts := []grpc.DialOption{
grpc.WithTransportCredentials(creds),
grpc.WithTimeout(5 * time.Second),
grpc.WithBackoffMaxDelay(5 * time.Second),
}

conn, err := grpc.Dial(peer.Addr, opts...)
if err != nil {
return nil, api.Peer{}, err
if forceRemote {
return connBroker.SelectRemote(dialOpts...)
}
return conn, peer, nil
return connBroker.Select(dialOpts...)
}

// GetRemoteCA returns the remote endpoint's CA certificate
func GetRemoteCA(ctx context.Context, d digest.Digest, r remotes.Remotes) (RootCA, error) {
func GetRemoteCA(ctx context.Context, d digest.Digest, connBroker *connectionbroker.Broker) (RootCA, error) {
// This TLS Config is intentionally using InsecureSkipVerify. We use the
// digest instead to check the integrity of the CA certificate.
insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
conn, peer, err := getGRPCConnection(insecureCreds, r)
conn, err := getGRPCConnection(insecureCreds, connBroker, false)
if err != nil {
return RootCA{}, err
}
defer conn.Close()

client := api.NewCAClient(conn)
client := api.NewCAClient(conn.ClientConn)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
defer func() {
if err != nil {
r.Observe(peer, -remotes.DefaultObservationWeight)
return
}
r.Observe(peer, remotes.DefaultObservationWeight)
conn.Close(err == nil)
}()
response, err := client.GetRootCACertificate(ctx, &api.GetRootCACertificateRequest{})
if err != nil {
Expand Down Expand Up @@ -558,20 +555,22 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, rootCAPool *x50
creds = credentials.NewTLS(&tls.Config{ServerName: CARole, RootCAs: rootCAPool})
}

conn, peer, err := getGRPCConnection(creds, config.Remotes)
conn, err := getGRPCConnection(creds, config.ConnBroker, config.ForceRemote)
if err != nil {
return nil, err
}
defer conn.Close()

// Create a CAClient to retrieve a new Certificate
caClient := api.NewNodeCAClient(conn)
caClient := api.NewNodeCAClient(conn.ClientConn)

issueCtx, issueCancel := context.WithTimeout(ctx, 5*time.Second)
defer issueCancel()

// Send the Request and retrieve the request token
issueRequest := &api.IssueNodeCertificateRequest{CSR: csr, Token: config.Token, Availability: config.Availability}
issueResponse, err := caClient.IssueNodeCertificate(ctx, issueRequest)
issueResponse, err := caClient.IssueNodeCertificate(issueCtx, issueRequest)
if err != nil {
config.Remotes.Observe(peer, -remotes.DefaultObservationWeight)
conn.Close(false)
return nil, err
}

Expand All @@ -589,13 +588,14 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, rootCAPool *x50
defer cancel()
statusResponse, err := caClient.NodeCertificateStatus(ctx, statusRequest)
if err != nil {
config.Remotes.Observe(peer, -remotes.DefaultObservationWeight)
conn.Close(false)
return nil, err
}

// If the certificate was issued, return
if statusResponse.Status.State == api.IssuanceStateIssued {
if statusResponse.Certificate == nil {
conn.Close(false)
return nil, errors.New("no certificate in CertificateStatus response")
}

Expand All @@ -605,7 +605,7 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, rootCAPool *x50
// retry until the certificate gets updated per our
// current request.
if bytes.Equal(statusResponse.Certificate.CSR, csr) {
config.Remotes.Observe(peer, remotes.DefaultObservationWeight)
conn.Close(true)
return statusResponse.Certificate.Certificate, nil
}
}
Expand Down
Loading