diff --git a/pkg/git/libgit2/managed/ssh.go b/pkg/git/libgit2/managed/ssh.go index d506ee420..e9873ba4e 100644 --- a/pkg/git/libgit2/managed/ssh.go +++ b/pkg/git/libgit2/managed/ssh.go @@ -44,6 +44,7 @@ THE SOFTWARE. package managed import ( + "context" "crypto/md5" "crypto/sha1" "crypto/sha256" @@ -53,10 +54,10 @@ import ( "net/url" "runtime" "strings" - "sync" "time" "golang.org/x/crypto/ssh" + "golang.org/x/net/proxy" "github.com/fluxcd/source-controller/pkg/git" git2go "github.com/libgit2/git2go/v33" @@ -65,17 +66,6 @@ import ( // registerManagedSSH registers a Go-native implementation of // SSH transport that doesn't rely on any lower-level libraries // such as libssh2. -// -// The underlying SSH connections are kept open and are reused -// across several SSH sessions. This is due to upstream issues in -// which concurrent/parallel SSH connections may lead to instability. -// -// Connections are created on first attempt to use a given remote. The -// connection is removed from the cache on the first failed session related -// operation. -// -// https://github.com/golang/go/issues/51926 -// https://github.com/golang/go/issues/27140 func registerManagedSSH() error { for _, protocol := range []string{"ssh", "ssh+git", "git+ssh"} { _, err := git2go.NewRegisteredSmartTransport(protocol, false, sshSmartSubtransportFactory) @@ -96,32 +86,16 @@ type sshSmartSubtransport struct { transport *git2go.Transport lastAction git2go.SmartServiceAction + conn net.Conn client *ssh.Client session *ssh.Session stdin io.WriteCloser stdout io.Reader currentStream *sshSmartSubtransportStream - ckey string addr string + connected bool } -// aMux is the read-write mutex to control access to sshClients. -var aMux sync.RWMutex - -type cachedClient struct { - *ssh.Client - activeSessions uint16 -} - -// sshClients stores active ssh clients/connections to be reused. -// -// Once opened, connections will be kept cached until an error occurs -// during SSH commands, by which point it will be discarded, leading to -// a follow-up cache miss. -// -// The key must be based on cacheKey, refer to that function's comments. -var sshClients map[string]*cachedClient = make(map[string]*cachedClient) - func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) { runtime.LockOSThread() defer runtime.UnlockOSThread() @@ -152,8 +126,6 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi if t.lastAction == git2go.SmartServiceActionUploadpackLs { return t.currentStream, nil } - // Disregard errors from previous stream, futher details inside Close(). - _ = t.Close() } cmd = fmt.Sprintf("git-upload-pack '%s'", uPath) @@ -162,8 +134,6 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi if t.lastAction == git2go.SmartServiceActionReceivepackLs { return t.currentStream, nil } - // Disregard errors from previous stream, futher details inside Close(). - _ = t.Close() } cmd = fmt.Sprintf("git-receive-pack '%s'", uPath) @@ -171,25 +141,27 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi return nil, fmt.Errorf("unexpected action: %v", action) } + if t.connected { + // Disregard errors from previous stream, futher details inside Close(). + _ = t.Close() + } + cred, err := t.transport.SmartCredentials("", git2go.CredentialTypeSSHMemory) if err != nil { return nil, err } defer cred.Free() - var addr string port := "22" if u.Port() != "" { port = u.Port() } - addr = fmt.Sprintf("%s:%s", u.Hostname(), port) - t.addr = addr + t.addr = net.JoinHostPort(u.Hostname(), port) - ckey, sshConfig, err := cacheKeyAndConfig(addr, cred) + sshConfig, err := clientConfig(t.addr, cred) if err != nil { return nil, err } - t.ckey = ckey sshConfig.HostKeyCallback = func(hostname string, remote net.Addr, key ssh.PublicKey) error { marshaledKey := key.Marshal() @@ -208,62 +180,54 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi return t.transport.SmartCertificateCheck(cert, true, hostname) } - var cacheHit bool - aMux.Lock() - if c, ok := sshClients[ckey]; ok { - traceLog.Info("[ssh]: cache hit", "remoteAddress", addr) - t.client = c.Client - cacheHit = true - c.activeSessions++ - } - aMux.Unlock() - - if t.client == nil { - cacheHit = false - traceLog.Info("[ssh]: cache miss", "remoteAddress", addr) - err := t.createConn(ckey, addr, sshConfig) + // During git operations, there may be multiple actions being + // executed within the same SubTransport. + // If a connection is already established, re-use it. + if !t.connected { + err = t.createConn(t.addr, sshConfig) if err != nil { return nil, err } + t.connected = true } traceLog.Info("[ssh]: creating new ssh session") if t.session, err = t.client.NewSession(); err != nil { - discardCachedSshClient(ckey) - - // if the current connection was cached, we can try again - // as this may be a stale connection. - if !cacheHit { - return nil, err - } - - traceLog.Info("[ssh]: cached connection was stale, retrying...") - err = t.createConn(ckey, addr, sshConfig) - if err != nil { - return nil, err - } - - traceLog.Info("[ssh]: creating new ssh session with new connection") - t.session, err = t.client.NewSession() - if err != nil { - discardCachedSshClient(ckey) - return nil, err - } + return nil, err } if t.stdin, err = t.session.StdinPipe(); err != nil { - discardCachedSshClient(ckey) return nil, err } - if t.stdout, err = t.session.StdoutPipe(); err != nil { - discardCachedSshClient(ckey) + var w *io.PipeWriter + var reader io.Reader + t.stdout, w = io.Pipe() + if reader, err = t.session.StdoutPipe(); err != nil { return nil, err } + // If the session's stdout pipe is not serviced fast + // enough it may cause the remote command to block. + // + // xref: https://github.com/golang/crypto/blob/eb4f295cb31f7fb5d52810411604a2638c9b19a2/ssh/session.go#L553-L558 + go func() error { + defer w.Close() + for { + if !t.connected { + return nil + } + + _, err := io.Copy(w, reader) + if err != nil { + return err + } + time.Sleep(5 * time.Millisecond) + } + }() + traceLog.Info("[ssh]: run on remote", "cmd", cmd) if err := t.session.Start(cmd); err != nil { - discardCachedSshClient(ckey) return nil, err } @@ -275,55 +239,21 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi return t.currentStream, nil } -func (t *sshSmartSubtransport) createConn(ckey, addr string, sshConfig *ssh.ClientConfig) error { - // In some scenarios the ssh handshake can hang indefinitely at - // golang.org/x/crypto/ssh.(*handshakeTransport).kexLoop. - // - // xref: https://github.com/golang/go/issues/51926 - done := make(chan error, 1) - var err error - - var c *ssh.Client - go func() { - c, err = ssh.Dial("tcp", addr, sshConfig) - done <- err - }() - - dialTimeout := sshConfig.Timeout + (30 * time.Second) - - select { - case doneErr := <-done: - if doneErr != nil { - err = fmt.Errorf("ssh.Dial: %w", doneErr) - } - case <-time.After(dialTimeout): - err = fmt.Errorf("timed out waiting for ssh.Dial after %s", dialTimeout) - } +func (t *sshSmartSubtransport) createConn(addr string, sshConfig *ssh.ClientConfig) error { + ctx, cancel := context.WithTimeout(context.TODO(), sshConnectionTimeOut) + defer cancel() + conn, err := proxy.Dial(ctx, "tcp", addr) if err != nil { return err } - - t.client = c - - // Mutex is set here to avoid the network latency being - // absorbed by all competing goroutines. - aMux.Lock() - defer aMux.Unlock() - - // A different goroutine won the race, dispose the connection - // and carry on. - if _, ok := sshClients[ckey]; ok { - go func() { - _ = c.Close() - }() - return nil + c, chans, reqs, err := ssh.NewClientConn(conn, addr, sshConfig) + if err != nil { + return err } - sshClients[ckey] = &cachedClient{ - Client: c, - activeSessions: 1, - } + t.conn = conn + t.client = ssh.NewClient(c, chans, reqs) return nil } @@ -356,6 +286,14 @@ func (t *sshSmartSubtransport) Close() error { func (t *sshSmartSubtransport) Free() { traceLog.Info("[ssh]: sshSmartSubtransport.Free()") + if t.client != nil { + _ = t.client.Close() + } + + if t.conn != nil { + _ = t.conn.Close() + } + t.connected = false } type sshSmartSubtransportStream struct { @@ -372,36 +310,25 @@ func (stream *sshSmartSubtransportStream) Write(buf []byte) (int, error) { func (stream *sshSmartSubtransportStream) Free() { traceLog.Info("[ssh]: sshSmartSubtransportStream.Free()") - if stream.owner == nil { - return - } - - if stream.owner.ckey != "" { - decrementActiveSessionIfFound(stream.owner.ckey) - } } -func cacheKeyAndConfig(remoteAddress string, cred *git2go.Credential) (string, *ssh.ClientConfig, error) { +func clientConfig(remoteAddress string, cred *git2go.Credential) (*ssh.ClientConfig, error) { if cred == nil { - return "", nil, fmt.Errorf("cannot create cache key from a nil credential") + return nil, fmt.Errorf("cannot create cache key from a nil credential") } username, _, privatekey, passphrase, err := cred.GetSSHKey() if err != nil { - return "", nil, err + return nil, err } var pemBytes []byte if cred.Type() == git2go.CredentialTypeSSHMemory { pemBytes = []byte(privatekey) } else { - return "", nil, fmt.Errorf("file based SSH credential is not supported") + return nil, fmt.Errorf("file based SSH credential is not supported") } - // must include the passphrase, otherwise a caller that knows the private key, but - // not its passphrase would be able to bypass auth. - ck := cacheKey(remoteAddress, username, passphrase, pemBytes) - var key ssh.Signer if passphrase != "" { key, err = ssh.ParsePrivateKeyWithPassphrase(pemBytes, []byte(passphrase)) @@ -410,7 +337,7 @@ func cacheKeyAndConfig(remoteAddress string, cred *git2go.Credential) (string, * } if err != nil { - return "", nil, err + return nil, err } cfg := &ssh.ClientConfig{ @@ -425,68 +352,5 @@ func cacheKeyAndConfig(remoteAddress string, cred *git2go.Credential) (string, * cfg.HostKeyAlgorithms = git.HostKeyAlgos } - return ck, cfg, nil -} - -// cacheKey generates a cache key that is multi-tenancy safe. -// -// Stablishing multiple and concurrent ssh connections leads to stability -// issues documented above. However, the caching/sharing of already stablished -// connections could represent a vector for users to bypass the ssh authentication -// mechanism. -// -// cacheKey tries to ensure that connections are only shared by users that -// have the exact same remoteAddress and credentials. -func cacheKey(remoteAddress, userName, passphrase string, pubKey []byte) string { - h := sha256.New() - - v := fmt.Sprintf("%s-%s-%s-%v", remoteAddress, userName, passphrase, pubKey) - - h.Write([]byte(v)) - return fmt.Sprintf("%x", h.Sum(nil)) -} - -// discardCachedSshClient discards the cached ssh client, forcing the next git operation -// to create a new one via ssh.Dial. -func discardCachedSshClient(key string) { - aMux.Lock() - defer aMux.Unlock() - - if v, found := sshClients[key]; found { - traceLog.Info("[ssh]: discard cached ssh client", "activeSessions", v.activeSessions) - closeConn := func() { - // run as async goroutine to minimise mutex time in immediate closures. - go func() { - if v.Client != nil { - _ = v.Client.Close() - } - }() - } - - // if no active sessions for this connection, close it right-away. - // otherwise, it may be used by other processes, so remove from cache, - // and schedule a delayed closure. - if v.activeSessions == 0 { - traceLog.Info("[ssh]: closing connection") - closeConn() - } else { - go func() { - // the delay must account for in-flight operations - // that depends on this connection. - time.Sleep(120 * time.Second) - traceLog.Info("[ssh]: closing connection after delay") - closeConn() - }() - } - delete(sshClients, key) - } -} - -func decrementActiveSessionIfFound(key string) { - aMux.Lock() - defer aMux.Unlock() - - if v, found := sshClients[key]; found { - v.activeSessions-- - } + return cfg, nil } diff --git a/pkg/git/libgit2/managed/ssh_test.go b/pkg/git/libgit2/managed/ssh_test.go deleted file mode 100644 index 0b28d5190..000000000 --- a/pkg/git/libgit2/managed/ssh_test.go +++ /dev/null @@ -1,124 +0,0 @@ -/* -Copyright 2022 The Flux authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package managed - -import ( - "testing" -) - -func TestCacheKey(t *testing.T) { - tests := []struct { - name string - remoteAddress1 string - user1 string - passphrase1 string - pubKey1 []byte - remoteAddress2 string - user2 string - passphrase2 string - pubKey2 []byte - expectMatch bool - }{ - { - name: "same remote addresses with no config", - remoteAddress1: "1.1.1.1", - remoteAddress2: "1.1.1.1", - expectMatch: true, - }, - { - name: "same remote addresses with different config", - remoteAddress1: "1.1.1.1", - user1: "joe", - remoteAddress2: "1.1.1.1", - user2: "another-joe", - expectMatch: false, - }, - { - name: "different remote addresses with no config", - remoteAddress1: "8.8.8.8", - remoteAddress2: "1.1.1.1", - expectMatch: false, - }, - { - name: "different remote addresses with same config", - remoteAddress1: "8.8.8.8", - user1: "legit", - remoteAddress2: "1.1.1.1", - user2: "legit", - expectMatch: false, - }, - { - name: "same remote addresses with same pubkey signers", - remoteAddress1: "1.1.1.1", - user1: "same-jane", - pubKey1: []byte{255, 123, 0}, - remoteAddress2: "1.1.1.1", - user2: "same-jane", - pubKey2: []byte{255, 123, 0}, - expectMatch: true, - }, - { - name: "same remote addresses with different pubkey signers", - remoteAddress1: "1.1.1.1", - user1: "same-jane", - pubKey1: []byte{255, 123, 0}, - remoteAddress2: "1.1.1.1", - user2: "same-jane", - pubKey2: []byte{0, 123, 0}, - expectMatch: false, - }, - { - name: "same remote addresses with pubkey signers and passphrases", - remoteAddress1: "1.1.1.1", - user1: "same-jane", - passphrase1: "same-pass", - pubKey1: []byte{255, 123, 0}, - remoteAddress2: "1.1.1.1", - user2: "same-jane", - passphrase2: "same-pass", - pubKey2: []byte{255, 123, 0}, - expectMatch: true, - }, - { - name: "same remote addresses with pubkey signers and different passphrases", - remoteAddress1: "1.1.1.1", - user1: "same-jane", - passphrase1: "same-pass", - pubKey1: []byte{255, 123, 0}, - remoteAddress2: "1.1.1.1", - user2: "same-jane", - passphrase2: "different-pass", - pubKey2: []byte{255, 123, 0}, - expectMatch: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - cacheKey1 := cacheKey(tt.remoteAddress1, tt.user1, tt.passphrase1, tt.pubKey1) - cacheKey2 := cacheKey(tt.remoteAddress2, tt.user2, tt.passphrase2, tt.pubKey2) - - if tt.expectMatch && cacheKey1 != cacheKey2 { - t.Errorf("cache keys '%s' and '%s' should match", cacheKey1, cacheKey2) - } - - if !tt.expectMatch && cacheKey1 == cacheKey2 { - t.Errorf("cache keys '%s' and '%s' should not match", cacheKey1, cacheKey2) - } - }) - } -}