Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v14] Displaying mode and controls to additional participants (#46450) #46914

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions lib/client/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1999,8 +1999,6 @@ func (tc *TeleportClient) Join(ctx context.Context, mode types.SessionParticipan
}
}

fmt.Printf("Joining session with participant mode: %v. \n\n", mode)

// running shell with a given session means "join" it:
err = nc.RunInteractiveShell(ctx, mode, session, beforeStart)
return trace.Wrap(err)
Expand Down
2 changes: 0 additions & 2 deletions lib/client/kubesession.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ func NewKubeSession(ctx context.Context, tc *TeleportClient, meta types.SessionT
TLSClientConfig: tlsConfig,
}

fmt.Printf("Joining session with participant mode: %v. \n\n", mode)

ws, resp, err := dialer.DialContext(ctx, joinEndpoint, nil)
if resp != nil && resp.Body != nil {
defer resp.Body.Close()
Expand Down
16 changes: 11 additions & 5 deletions lib/kube/proxy/sess.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,6 @@ func newSession(ctx authContext, forwarder *Forwarder, req *http.Request, params
s.io.OnReadError = s.disconnectPartyOnErr

s.BroadcastMessage("Creating session with ID: %v...", id.String())
s.BroadcastMessage(srv.SessionControlsInfoBroadcast)

go func() {
if _, open := <-s.io.TerminateNotifier(); open {
Expand Down Expand Up @@ -969,7 +968,7 @@ func (s *session) join(p *party, emitJoinEvent bool) error {
return trace.Wrap(err)
}

// we only want to emit the session.join when someone tries to join a session via
// We only want to emit the session.join when someone tries to join a session via
// tsh kube join and not when the original session owner terminal streams are
// connected to the Kubernetes session.
if emitJoinEvent {
Expand All @@ -980,6 +979,7 @@ func (s *session) join(p *party, emitJoinEvent bool) error {
if _, err := p.Client.stdoutStream().Write(recentWrites); err != nil {
s.log.Warnf("Failed to write history to client: %v.", err)
}
s.BroadcastMessage("User %v joined the session with participant mode: %v.", p.Ctx.User.GetName(), p.Mode)

// increment the party track waitgroup.
// It is decremented when session.leave() finishes its execution.
Expand All @@ -1004,10 +1004,17 @@ func (s *session) join(p *party, emitJoinEvent bool) error {
if p.Mode == types.SessionPeerMode {
s.io.AddReader(stringID, p.Client.stdinStream())
}

s.io.AddWriter(stringID, p.Client.stdoutStream())
s.BroadcastMessage("User %v joined the session with participant mode: %v.", p.Ctx.User.GetName(), p.Mode)

// Send the participant mode and controls to the additional participant
if p.Ctx.User.GetName() != s.ctx.User.GetName() {
err := srv.MsgParticipantCtrls(p.Client.stdoutStream(), p.Mode)
if err != nil {
s.log.Errorf("Could not send intro message to participant: %v", err)
}
}

// Allow the moderator to force terminate the session
if p.Mode == types.SessionModeratorMode {
s.weakEventsWaiter.Add(1)
go func() {
Expand Down Expand Up @@ -1080,7 +1087,6 @@ func (s *session) join(p *party, emitJoinEvent bool) error {
s.log.Warnf("Failed to set tracker state to %v", types.SessionState_SessionStateRunning)
}
}

return nil
}

Expand Down
34 changes: 26 additions & 8 deletions lib/srv/sess.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package srv

import (
"bytes"
"context"
"encoding/json"
"errors"
Expand Down Expand Up @@ -65,10 +66,6 @@ const (
PresenceMaxDifference = time.Minute
)

// SessionControlsInfoBroadcast is sent in tandem with session creation
// to inform any joining users about the session controls.
const SessionControlsInfoBroadcast = "Controls\r\n - CTRL-C: Leave the session\r\n - t: Forcefully terminate the session (moderators only)"

const (
// sessionRecordingWarningMessage is sent when the session recording is
// going to be disabled.
Expand All @@ -85,6 +82,21 @@ var serverSessions = prometheus.NewGauge(
},
)

func MsgParticipantCtrls(w io.Writer, m types.SessionParticipantMode) error {
var modeCtrl bytes.Buffer
modeCtrl.WriteString(fmt.Sprintf("\r\nTeleport > Joining session with participant mode: %s\r\n", string(m)))
modeCtrl.WriteString("Teleport > Controls\r\n")
modeCtrl.WriteString("Teleport > - CTRL-C: Leave the session\r\n")
if m == types.SessionModeratorMode {
modeCtrl.WriteString("Teleport > - t: Forcefully terminate the session\r\n")
}
_, err := w.Write(modeCtrl.Bytes())
if err != nil {
return fmt.Errorf("could not write bytes: %w", err)
}
return nil
}

// SessionRegistry holds a map of all active sessions on a given
// SSH server
type SessionRegistry struct {
Expand Down Expand Up @@ -1295,7 +1307,6 @@ func (s *session) startInteractive(ctx context.Context, scx *ServerContext, p *p
s.io.AddReader("reader", inReader)
s.io.AddWriter(sessionRecorderID, utils.WriteCloserWithContext(scx.srv.Context(), s.Recorder()))
s.BroadcastMessage("Creating session with ID: %v", s.id)
s.BroadcastMessage(SessionControlsInfoBroadcast)

if err := s.startTerminal(ctx, scx); err != nil {
return trace.Wrap(err)
Expand Down Expand Up @@ -1950,16 +1961,23 @@ func (s *session) addParty(p *party, mode types.SessionParticipantMode) error {
s.participants[p.id] = p
p.ctx.AddCloser(p)

// Write last chunk (so the newly joined parties won't stare at a blank
// screen).
// Write last chunk (so the newly joined parties won't stare at a blank screen).
if _, err := p.Write(s.io.GetRecentHistory()); err != nil {
return trace.Wrap(err)
}
s.BroadcastMessage("User %v joined the session with participant mode: %v.", p.user, p.mode)

// Register this party as one of the session writers (output will go to it).
s.io.AddWriter(string(p.id), p)

s.BroadcastMessage("User %v joined the session with participant mode: %v.", p.user, p.mode)
// Send the participant mode and controls to the additional participant
if s.login != p.login {
err := MsgParticipantCtrls(p.ch, mode)
if err != nil {
s.log.Errorf("Could not send intro message to participant: %v", err)
}
}

s.log.Infof("New party %v joined the session with participant mode: %v.", p.String(), p.mode)

if mode == types.SessionPeerMode {
Expand Down
19 changes: 15 additions & 4 deletions lib/web/apiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7590,7 +7590,7 @@
timeoutCh := time.After(10 * time.Second)

var prev string
out := make([]byte, int64(len(substr)*2))
out := make([]byte, int64(len(substr)*3))
for {
select {
case <-timeoutCh:
Expand Down Expand Up @@ -9528,7 +9528,7 @@
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, peerWS.Close()) })

peerStream := terminal.NewStream(ctx, terminal.StreamConfig{WS: peerWS})
require.NoError(t, waitForOutput(peerTerm, "Teleport > Waiting for required participants..."), "waiting for peer to enter session")

require.NoError(t, waitForOutput(peerStream, "Teleport > User foo joined the session with participant mode: peer."))

Expand Down Expand Up @@ -9646,8 +9646,19 @@
challenge, err := moderatorStream.ReadChallenge(protobufMFACodec{})
require.NoError(t, err)

res, err := moderator.device.SolveAuthn(challenge)
require.NoError(t, err)
require.NoError(t, waitForOutput(peerTerm, "Teleport > User foo joined the session with participant mode: peer."), "waiting for peer to start session")

moderatorTerm, err := connectToHost(ctx, connectConfig{
pack: moderator,
host: s.node.ID(),
proxy: s.webServer.Listener.Addr().String(),
sessionID: peerTerm.GetSession().ID,
participantMode: types.SessionModeratorMode,
mfaCeremony: func(challenge client.MFAAuthenticateChallenge) []byte {
res, err := moderator.device.SolveAuthn(&authproto.MFAAuthenticateChallenge{
WebauthnChallenge: wantypes.CredentialAssertionToProto(challenge.WebauthnChallenge),
})
require.NoError(t, err)

webauthnResBytes, err := json.Marshal(wantypes.CredentialAssertionResponseFromProto(res.GetWebauthn()))
require.NoError(t, err)
Expand All @@ -9661,16 +9672,16 @@
require.NoError(t, err)

require.NoError(t, moderatorWS.WriteMessage(websocket.BinaryMessage, envelopeBytes))
}

Check failure on line 9675 in lib/web/apiserver_test.go

View workflow job for this annotation

GitHub Actions / Lint (Go)

missing ',' before newline in composite literal (typecheck)

// advance the clock far enough in the future to make the moderator stale
// which will terminate the session
s.clock.Advance(180 * time.Second)

Check failure on line 9679 in lib/web/apiserver_test.go

View workflow job for this annotation

GitHub Actions / Lint (Go)

missing ',' before newline in composite literal (typecheck)
require.NoError(t, waitForOutput(moderatorStream, "wait: remote command exited without exit status or exit signal"))

Check failure on line 9680 in lib/web/apiserver_test.go

View workflow job for this annotation

GitHub Actions / Lint (Go)

missing ',' before newline in composite literal (typecheck)
require.NoError(t, waitForOutput(peerStream, "Process exited with status 255"))

Check failure on line 9681 in lib/web/apiserver_test.go

View workflow job for this annotation

GitHub Actions / Lint (Go)

missing ',' before newline in composite literal (typecheck)
}

Check failure on line 9682 in lib/web/apiserver_test.go

View workflow job for this annotation

GitHub Actions / Lint (Go)

missing ',' before newline in argument list (typecheck)

func handleMFAWebauthnChallenge(t *testing.T, ws *websocket.Conn, dev *auth.TestDevice) {

Check failure on line 9684 in lib/web/apiserver_test.go

View workflow job for this annotation

GitHub Actions / Lint (Go)

expected '(', found handleMFAWebauthnChallenge (typecheck)
// Wait for websocket authn challenge event.
ty, raw, err := ws.ReadMessage()
require.NoError(t, err)
Expand Down Expand Up @@ -9699,9 +9710,9 @@
require.NoError(t, err)

require.NoError(t, ws.WriteMessage(websocket.BinaryMessage, envelopeBytes))
}

Check failure on line 9713 in lib/web/apiserver_test.go

View workflow job for this annotation

GitHub Actions / Lint (Go)

missing ',' before newline in argument list (typecheck)

type proxyClientMock struct {

Check failure on line 9715 in lib/web/apiserver_test.go

View workflow job for this annotation

GitHub Actions / Lint (Go)

expected operand, found 'type' (typecheck)
authclient.ClientI
tokens map[string]types.ProvisionToken
}
Expand All @@ -9709,7 +9720,7 @@
// GetToken returns provisioning token
func (pc *proxyClientMock) GetToken(_ context.Context, token string) (types.ProvisionToken, error) {
tok, ok := pc.tokens[token]
if ok {

Check failure on line 9723 in lib/web/apiserver_test.go

View workflow job for this annotation

GitHub Actions / Lint (Go)

missing ',' in argument list (typecheck)
return tok, nil
}

Expand Down
Loading