diff --git a/lib/client/api.go b/lib/client/api.go index b7a4b604e2c0..57a03924c526 100644 --- a/lib/client/api.go +++ b/lib/client/api.go @@ -2106,8 +2106,6 @@ func (tc *TeleportClient) Join(ctx context.Context, mode types.SessionParticipan } } - fmt.Printf("Joining session with participant mode: %v. \n\n", mode) - // running shell with a given session means "join" it: err = nc.RunInteractiveShell(ctx, mode, session, tc.OnChannelRequest, beforeStart) return trace.Wrap(err) diff --git a/lib/client/kubesession.go b/lib/client/kubesession.go index 0ce75de9522b..b3222ac5530a 100644 --- a/lib/client/kubesession.go +++ b/lib/client/kubesession.go @@ -63,8 +63,6 @@ func NewKubeSession(ctx context.Context, tc *TeleportClient, meta types.SessionT TLSClientConfig: tlsConfig, } - fmt.Printf("Joining session with participant mode: %v. \n\n", mode) - ws, resp, err := dialer.DialContext(ctx, joinEndpoint, nil) if resp != nil && resp.Body != nil { defer resp.Body.Close() diff --git a/lib/kube/proxy/forwarder.go b/lib/kube/proxy/forwarder.go index 544952da0e6b..7f5fef31c124 100644 --- a/lib/kube/proxy/forwarder.go +++ b/lib/kube/proxy/forwarder.go @@ -1656,9 +1656,7 @@ func (f *Forwarder) exec(authCtx *authContext, w http.ResponseWriter, req *http. } f.setSession(session.id, session) - // When Teleport attaches the original session creator terminal streams to the - // session, we don't want to emit session.join event since it won't be required. - if err = session.join(party, false /* emitSessionJoinEvent */); err != nil { + if err = session.join(party, true /* emitSessionJoinEvent */); err != nil { return trace.Wrap(err) } diff --git a/lib/kube/proxy/sess.go b/lib/kube/proxy/sess.go index f09770c5380d..54ed10105b8e 100644 --- a/lib/kube/proxy/sess.go +++ b/lib/kube/proxy/sess.go @@ -498,7 +498,6 @@ func newSession(ctx authContext, forwarder *Forwarder, req *http.Request, params s.io.OnReadError = s.disconnectPartyOnErr s.BroadcastMessage("Creating session with ID: %v...", id.String()) - s.BroadcastMessage(srv.SessionControlsInfoBroadcast) go func() { if _, open := <-s.io.TerminateNotifier(); open { @@ -982,7 +981,7 @@ func (s *session) join(p *party, emitJoinEvent bool) error { return trace.Wrap(err) } - // we only want to emit the session.join when someone tries to join a session via + // We only want to emit the session.join when someone tries to join a session via // tsh kube join and not when the original session owner terminal streams are // connected to the Kubernetes session. if emitJoinEvent { @@ -993,6 +992,7 @@ func (s *session) join(p *party, emitJoinEvent bool) error { if _, err := p.Client.stdoutStream().Write(recentWrites); err != nil { s.log.Warnf("Failed to write history to client: %v.", err) } + s.BroadcastMessage("User %v joined the session with participant mode: %v.", p.Ctx.User.GetName(), p.Mode) // increment the party track waitgroup. // It is decremented when session.leave() finishes its execution. @@ -1017,10 +1017,17 @@ func (s *session) join(p *party, emitJoinEvent bool) error { if p.Mode == types.SessionPeerMode { s.io.AddReader(stringID, p.Client.stdinStream()) } - s.io.AddWriter(stringID, p.Client.stdoutStream()) - s.BroadcastMessage("User %v joined the session with participant mode: %v.", p.Ctx.User.GetName(), p.Mode) + // Send the participant mode and controls to the additional participant + if p.Ctx.User.GetName() != s.ctx.User.GetName() { + err := srv.MsgParticipantCtrls(p.Client.stdoutStream(), p.Mode) + if err != nil { + s.log.Errorf("Could not send intro message to participant: %v", err) + } + } + + // Allow the moderator to force terminate the session if p.Mode == types.SessionModeratorMode { s.weakEventsWaiter.Add(1) go func() { @@ -1095,7 +1102,6 @@ func (s *session) join(p *party, emitJoinEvent bool) error { s.log.Warnf("Failed to set tracker state to %v", types.SessionState_SessionStateRunning) } } - return nil } diff --git a/lib/srv/sess.go b/lib/srv/sess.go index 7633ebe054f0..328ca574e629 100644 --- a/lib/srv/sess.go +++ b/lib/srv/sess.go @@ -19,6 +19,7 @@ package srv import ( + "bytes" "context" "encoding/json" "errors" @@ -66,10 +67,6 @@ const ( PresenceMaxDifference = time.Minute ) -// SessionControlsInfoBroadcast is sent in tandem with session creation -// to inform any joining users about the session controls. -const SessionControlsInfoBroadcast = "Controls\r\n - CTRL-C: Leave the session\r\n - t: Forcefully terminate the session (moderators only)" - const ( // sessionRecordingWarningMessage is sent when the session recording is // going to be disabled. @@ -86,6 +83,21 @@ var serverSessions = prometheus.NewGauge( }, ) +func MsgParticipantCtrls(w io.Writer, m types.SessionParticipantMode) error { + var modeCtrl bytes.Buffer + modeCtrl.WriteString(fmt.Sprintf("\r\nTeleport > Joining session with participant mode: %s\r\n", string(m))) + modeCtrl.WriteString("Teleport > Controls\r\n") + modeCtrl.WriteString("Teleport > - CTRL-C: Leave the session\r\n") + if m == types.SessionModeratorMode { + modeCtrl.WriteString("Teleport > - t: Forcefully terminate the session\r\n") + } + _, err := w.Write(modeCtrl.Bytes()) + if err != nil { + return fmt.Errorf("could not write bytes: %w", err) + } + return nil +} + // SessionRegistry holds a map of all active sessions on a given // SSH server type SessionRegistry struct { @@ -1292,7 +1304,6 @@ func (s *session) startInteractive(ctx context.Context, scx *ServerContext, p *p s.io.AddReader("reader", inReader) s.io.AddWriter(sessionRecorderID, utils.WriteCloserWithContext(scx.srv.Context(), s.Recorder())) s.BroadcastMessage("Creating session with ID: %v", s.id) - s.BroadcastMessage(SessionControlsInfoBroadcast) if err := s.startTerminal(ctx, scx); err != nil { return trace.Wrap(err) @@ -1942,16 +1953,23 @@ func (s *session) addParty(p *party, mode types.SessionParticipantMode) error { s.participants[p.id] = p p.ctx.AddCloser(p) - // Write last chunk (so the newly joined parties won't stare at a blank - // screen). + // Write last chunk (so the newly joined parties won't stare at a blank screen). if _, err := p.Write(s.io.GetRecentHistory()); err != nil { return trace.Wrap(err) } + s.BroadcastMessage("User %v joined the session with participant mode: %v.", p.user, p.mode) // Register this party as one of the session writers (output will go to it). s.io.AddWriter(string(p.id), p) - s.BroadcastMessage("User %v joined the session with participant mode: %v.", p.user, p.mode) + // Send the participant mode and controls to the additional participant + if s.login != p.login { + err := MsgParticipantCtrls(p.ch, mode) + if err != nil { + s.log.Errorf("Could not send intro message to participant: %v", err) + } + } + s.log.Infof("New party %v joined the session with participant mode: %v.", p.String(), p.mode) if mode == types.SessionPeerMode { diff --git a/lib/web/apiserver_test.go b/lib/web/apiserver_test.go index d824e953a10b..2e2d4411594a 100644 --- a/lib/web/apiserver_test.go +++ b/lib/web/apiserver_test.go @@ -7579,7 +7579,7 @@ func waitForOutputWithDuration(r ReaderWithDeadline, substr string, timeout time timeoutCh := time.After(timeout) var prev string - out := make([]byte, int64(len(substr)*2)) + out := make([]byte, int64(len(substr)*3)) for { select { case <-timeoutCh: @@ -9495,7 +9495,7 @@ func TestModeratedSession(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { require.NoError(t, peerTerm.Close()) }) - require.NoError(t, waitForOutput(peerTerm, "Teleport > User foo joined the session with participant mode: peer."), "waiting for peer to enter session") + require.NoError(t, waitForOutput(peerTerm, "Teleport > Waiting for required participants..."), "waiting for peer to enter session") moderatorTerm, err := connectToHost(ctx, connectConfig{ pack: s.authPack(t, "bar", moderatorRole.GetName()), @@ -9613,7 +9613,7 @@ func TestModeratedSessionWithMFA(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { require.NoError(t, peerTerm.Close()) }) - require.NoError(t, waitForOutput(peerTerm, "Teleport > User foo joined the session with participant mode: peer."), "waiting for peer to start session") + require.NoError(t, waitForOutput(peerTerm, "Teleport > Waiting for required participants..."), "waiting for peer to start session") moderatorTerm, err := connectToHost(ctx, connectConfig{ pack: moderator,