Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v16] Displaying mode and controls to additional participants (#46450) #46901

Merged
merged 2 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions lib/client/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2148,8 +2148,6 @@ func (tc *TeleportClient) Join(ctx context.Context, mode types.SessionParticipan
}
}

fmt.Printf("Joining session with participant mode: %v. \n\n", mode)

// running shell with a given session means "join" it:
err = nc.RunInteractiveShell(ctx, mode, session, tc.OnChannelRequest, beforeStart)
return trace.Wrap(err)
Expand Down
2 changes: 0 additions & 2 deletions lib/client/kubesession.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ func NewKubeSession(ctx context.Context, tc *TeleportClient, meta types.SessionT
TLSClientConfig: tlsConfig,
}

fmt.Printf("Joining session with participant mode: %v. \n\n", mode)

ws, resp, err := dialer.DialContext(ctx, joinEndpoint, nil)
if resp != nil && resp.Body != nil {
defer resp.Body.Close()
Expand Down
4 changes: 1 addition & 3 deletions lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1674,9 +1674,7 @@ func (f *Forwarder) exec(authCtx *authContext, w http.ResponseWriter, req *http.
}

f.setSession(session.id, session)
// When Teleport attaches the original session creator terminal streams to the
// session, we don't want to emit session.join event since it won't be required.
if err = session.join(party, false /* emitSessionJoinEvent */); err != nil {
if err = session.join(party, true /* emitSessionJoinEvent */); err != nil {
return trace.Wrap(err)
}

Expand Down
16 changes: 11 additions & 5 deletions lib/kube/proxy/sess.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,6 @@ func newSession(ctx authContext, forwarder *Forwarder, req *http.Request, params
s.io.OnReadError = s.disconnectPartyOnErr

s.BroadcastMessage("Creating session with ID: %v...", id.String())
s.BroadcastMessage(srv.SessionControlsInfoBroadcast)

go func() {
if _, open := <-s.io.TerminateNotifier(); open {
Expand Down Expand Up @@ -982,7 +981,7 @@ func (s *session) join(p *party, emitJoinEvent bool) error {
return trace.Wrap(err)
}

// we only want to emit the session.join when someone tries to join a session via
// We only want to emit the session.join when someone tries to join a session via
// tsh kube join and not when the original session owner terminal streams are
// connected to the Kubernetes session.
if emitJoinEvent {
Expand All @@ -993,6 +992,7 @@ func (s *session) join(p *party, emitJoinEvent bool) error {
if _, err := p.Client.stdoutStream().Write(recentWrites); err != nil {
s.log.Warnf("Failed to write history to client: %v.", err)
}
s.BroadcastMessage("User %v joined the session with participant mode: %v.", p.Ctx.User.GetName(), p.Mode)

// increment the party track waitgroup.
// It is decremented when session.leave() finishes its execution.
Expand All @@ -1017,10 +1017,17 @@ func (s *session) join(p *party, emitJoinEvent bool) error {
if p.Mode == types.SessionPeerMode {
s.io.AddReader(stringID, p.Client.stdinStream())
}

s.io.AddWriter(stringID, p.Client.stdoutStream())
s.BroadcastMessage("User %v joined the session with participant mode: %v.", p.Ctx.User.GetName(), p.Mode)

// Send the participant mode and controls to the additional participant
if p.Ctx.User.GetName() != s.ctx.User.GetName() {
err := srv.MsgParticipantCtrls(p.Client.stdoutStream(), p.Mode)
if err != nil {
s.log.Errorf("Could not send intro message to participant: %v", err)
}
}

// Allow the moderator to force terminate the session
if p.Mode == types.SessionModeratorMode {
s.weakEventsWaiter.Add(1)
go func() {
Expand Down Expand Up @@ -1095,7 +1102,6 @@ func (s *session) join(p *party, emitJoinEvent bool) error {
s.log.Warnf("Failed to set tracker state to %v", types.SessionState_SessionStateRunning)
}
}

return nil
}

Expand Down
34 changes: 26 additions & 8 deletions lib/srv/sess.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package srv

import (
"bytes"
"context"
"encoding/json"
"errors"
Expand Down Expand Up @@ -66,10 +67,6 @@ const (
PresenceMaxDifference = time.Minute
)

// SessionControlsInfoBroadcast is sent in tandem with session creation
// to inform any joining users about the session controls.
const SessionControlsInfoBroadcast = "Controls\r\n - CTRL-C: Leave the session\r\n - t: Forcefully terminate the session (moderators only)"

const (
// sessionRecordingWarningMessage is sent when the session recording is
// going to be disabled.
Expand All @@ -86,6 +83,21 @@ var serverSessions = prometheus.NewGauge(
},
)

func MsgParticipantCtrls(w io.Writer, m types.SessionParticipantMode) error {
var modeCtrl bytes.Buffer
modeCtrl.WriteString(fmt.Sprintf("\r\nTeleport > Joining session with participant mode: %s\r\n", string(m)))
modeCtrl.WriteString("Teleport > Controls\r\n")
modeCtrl.WriteString("Teleport > - CTRL-C: Leave the session\r\n")
if m == types.SessionModeratorMode {
modeCtrl.WriteString("Teleport > - t: Forcefully terminate the session\r\n")
}
_, err := w.Write(modeCtrl.Bytes())
if err != nil {
return fmt.Errorf("could not write bytes: %w", err)
}
return nil
}

// SessionRegistry holds a map of all active sessions on a given
// SSH server
type SessionRegistry struct {
Expand Down Expand Up @@ -1291,7 +1303,6 @@ func (s *session) startInteractive(ctx context.Context, scx *ServerContext, p *p
s.io.AddReader("reader", inReader)
s.io.AddWriter(sessionRecorderID, utils.WriteCloserWithContext(scx.srv.Context(), s.Recorder()))
s.BroadcastMessage("Creating session with ID: %v", s.id)
s.BroadcastMessage(SessionControlsInfoBroadcast)

if err := s.startTerminal(ctx, scx); err != nil {
return trace.Wrap(err)
Expand Down Expand Up @@ -1941,16 +1952,23 @@ func (s *session) addParty(p *party, mode types.SessionParticipantMode) error {
s.participants[p.id] = p
p.ctx.AddCloser(p)

// Write last chunk (so the newly joined parties won't stare at a blank
// screen).
// Write last chunk (so the newly joined parties won't stare at a blank screen).
if _, err := p.Write(s.io.GetRecentHistory()); err != nil {
return trace.Wrap(err)
}
s.BroadcastMessage("User %v joined the session with participant mode: %v.", p.user, p.mode)

// Register this party as one of the session writers (output will go to it).
s.io.AddWriter(string(p.id), p)

s.BroadcastMessage("User %v joined the session with participant mode: %v.", p.user, p.mode)
// Send the participant mode and controls to the additional participant
if s.login != p.login {
err := MsgParticipantCtrls(p.ch, mode)
if err != nil {
s.log.Errorf("Could not send intro message to participant: %v", err)
}
}

s.log.Infof("New party %v joined the session with participant mode: %v.", p.String(), p.mode)

if mode == types.SessionPeerMode {
Expand Down
6 changes: 3 additions & 3 deletions lib/web/apiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7765,7 +7765,7 @@ func waitForOutputWithDuration(r ReaderWithDeadline, substr string, timeout time
timeoutCh := time.After(timeout)

var prev string
out := make([]byte, int64(len(substr)*2))
out := make([]byte, int64(len(substr)*3))
for {
select {
case <-timeoutCh:
Expand Down Expand Up @@ -9831,7 +9831,7 @@ func TestModeratedSession(t *testing.T) {
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, peerTerm.Close()) })

require.NoError(t, waitForOutput(peerTerm, "Teleport > User foo joined the session with participant mode: peer."), "waiting for peer to enter session")
require.NoError(t, waitForOutput(peerTerm, "Teleport > Waiting for required participants..."), "waiting for peer to enter session")

moderatorTerm, err := connectToHost(ctx, connectConfig{
pack: s.authPack(t, "bar", moderatorRole.GetName()),
Expand Down Expand Up @@ -9949,7 +9949,7 @@ func TestModeratedSessionWithMFA(t *testing.T) {
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, peerTerm.Close()) })

require.NoError(t, waitForOutput(peerTerm, "Teleport > User foo joined the session with participant mode: peer."), "waiting for peer to start session")
require.NoError(t, waitForOutput(peerTerm, "Teleport > Waiting for required participants..."), "waiting for peer to start session")

moderatorTerm, err := connectToHost(ctx, connectConfig{
pack: moderator,
Expand Down
Loading