From aec9e883b0a5ba29befd98a445049e2a2c8c3bd9 Mon Sep 17 00:00:00 2001 From: Thomas Hallgren Date: Sun, 28 Jul 2024 14:10:52 +0200 Subject: [PATCH 1/2] Limit intercepts based on container port number and protocol, not pod. This commit gets rid of an unnecessary limitation in the traffic-agent that prohibits more than one simultaneous intercept per pod. Instead, an intercept is now limited to the unique combination of `` and ``, where `` is UDP or TCP and `` is a container port. Signed-off-by: Thomas Hallgren --- cmd/traffic/cmd/agent/agent.go | 4 +- cmd/traffic/cmd/agent/fwdstate.go | 40 +++++++------ cmd/traffic/cmd/agent/state.go | 36 ------------ cmd/traffic/cmd/agent/state_test.go | 2 +- integration_test/testdata/k8s/echo-two.yaml | 65 +++++++++++++++++++++ 5 files changed, 90 insertions(+), 57 deletions(-) create mode 100644 integration_test/testdata/k8s/echo-two.yaml diff --git a/cmd/traffic/cmd/agent/agent.go b/cmd/traffic/cmd/agent/agent.go index 1382698a23..a6416fcb4f 100644 --- a/cmd/traffic/cmd/agent/agent.go +++ b/cmd/traffic/cmd/agent/agent.go @@ -139,7 +139,7 @@ func Main(ctx context.Context, _ ...string) error { EnableSignalHandling: true, }) - s := NewSimpleState(config) + s := NewState(config) info, err := StartServices(ctx, g, config, s) if err != nil { return err @@ -154,7 +154,7 @@ func Main(ctx context.Context, _ ...string) error { return g.Wait() } -func sidecar(ctx context.Context, s SimpleState, info *rpc.AgentInfo) error { +func sidecar(ctx context.Context, s State, info *rpc.AgentInfo) error { // Manage the forwarders ac := s.AgentConfig() for _, cn := range ac.Containers { diff --git a/cmd/traffic/cmd/agent/fwdstate.go b/cmd/traffic/cmd/agent/fwdstate.go index 2cf908d8d6..d885292de0 100644 --- a/cmd/traffic/cmd/agent/fwdstate.go +++ b/cmd/traffic/cmd/agent/fwdstate.go @@ -14,22 +14,23 @@ import ( ) type fwdState struct { - *simpleState - intercept InterceptTarget - forwarder forwarder.Interceptor - mountPoint string - env map[string]string + *state + intercept InterceptTarget + forwarder forwarder.Interceptor + mountPoint string + env map[string]string + chosenIntercept *manager.InterceptInfo } -// NewInterceptState creates a InterceptState that performs intercepts by using an Interceptor which indiscriminately +// NewInterceptState creates an InterceptState that performs intercepts by using an Interceptor which indiscriminately // intercepts all traffic to the port that it forwards. -func (s *simpleState) NewInterceptState(forwarder forwarder.Interceptor, intercept InterceptTarget, mountPoint string, env map[string]string) InterceptState { +func (s *state) NewInterceptState(forwarder forwarder.Interceptor, intercept InterceptTarget, mountPoint string, env map[string]string) InterceptState { return &fwdState{ - simpleState: s, - mountPoint: mountPoint, - intercept: intercept, - forwarder: forwarder, - env: env, + state: s, + mountPoint: mountPoint, + intercept: intercept, + forwarder: forwarder, + env: env, } } @@ -74,17 +75,20 @@ func (pm *ProviderMux) CreateClientStream(ctx context.Context, sessionID string, func (fs *fwdState) HandleIntercepts(ctx context.Context, cepts []*manager.InterceptInfo) []*manager.ReviewInterceptRequest { var myChoice, activeIntercept *manager.InterceptInfo - - // Find the chosen intercept if it still exists if fs.chosenIntercept != nil { - for _, cept := range cepts { - if cept == fs.chosenIntercept { - myChoice = cept + chosenID := fs.chosenIntercept.Id + for _, is := range cepts { + if chosenID == is.Id { + fs.chosenIntercept = is + myChoice = is break } } - if myChoice != nil && myChoice.Disposition == manager.InterceptDispositionType_ACTIVE { + if myChoice == nil { + // Chosen intercept is not present in the snapshot + fs.chosenIntercept = nil + } else if myChoice.Disposition == manager.InterceptDispositionType_ACTIVE { // The chosen intercept still exists and is active activeIntercept = myChoice } diff --git a/cmd/traffic/cmd/agent/state.go b/cmd/traffic/cmd/agent/state.go index 60845e46dd..4d436f2c44 100644 --- a/cmd/traffic/cmd/agent/state.go +++ b/cmd/traffic/cmd/agent/state.go @@ -32,10 +32,6 @@ type State interface { SetManager(ctx context.Context, sessionInfo *manager.SessionInfo, manager manager.ManagerClient, version semver.Version) FtpPort() uint16 SftpPort() uint16 -} - -type SimpleState interface { - State NewInterceptState(forwarder forwarder.Interceptor, target InterceptTarget, mountPoint string, env map[string]string) InterceptState } @@ -64,11 +60,6 @@ type state struct { agent.UnimplementedAgentServer } -type simpleState struct { - state - chosenIntercept *manager.InterceptInfo -} - func (s *state) ManagerClient() manager.ManagerClient { return s.manager } @@ -94,14 +85,6 @@ func NewState(config Config) State { } } -func NewSimpleState(config Config) SimpleState { - return &simpleState{state: state{ - Config: config, - dialWatchers: xsync.NewMapOf[string, chan *manager.DialRequest](), - awaitingForwards: xsync.NewMapOf[string, *xsync.MapOf[tunnel.ConnID, *awaitingForward]](), - }} -} - func (s *state) AddInterceptState(is InterceptState) { s.interceptStates = append(s.interceptStates, is) } @@ -131,25 +114,6 @@ func (s *state) HandleIntercepts(ctx context.Context, iis []*manager.InterceptIn return rs } -func (s *simpleState) HandleIntercepts(ctx context.Context, iis []*manager.InterceptInfo) []*manager.ReviewInterceptRequest { - if s.chosenIntercept != nil { - chosenID := s.chosenIntercept.Id - found := false - for _, is := range iis { - if chosenID == is.Id { - found = true - s.chosenIntercept = is - break - } - } - if !found { - // Chosen intercept is not present in the snapshot - s.chosenIntercept = nil - } - } - return s.state.HandleIntercepts(ctx, iis) -} - func (s *state) InterceptInfo(ctx context.Context, callerID, path string, containerPort uint16, headers http.Header) (*restapi.InterceptInfo, error) { if containerPort == 0 && len(s.interceptStates) == 1 { containerPort = s.interceptStates[0].Target().ContainerPort() diff --git a/cmd/traffic/cmd/agent/state_test.go b/cmd/traffic/cmd/agent/state_test.go index 2ad56bdb1e..520249277b 100644 --- a/cmd/traffic/cmd/agent/state_test.go +++ b/cmd/traffic/cmd/agent/state_test.go @@ -40,7 +40,7 @@ func makeFS(t *testing.T, ctx context.Context) (forwarder.Interceptor, agent.Sta c, err := agent.LoadConfig(ctx) require.NoError(t, err) - s := agent.NewSimpleState(c) + s := agent.NewState(c) cn := c.AgentConfig().Containers[0] cnMountPoint := filepath.Join(agentconfig.ExportsMountPoint, filepath.Base(cn.MountPoint)) s.AddInterceptState(s.NewInterceptState(f, agent.NewInterceptTarget(cn.Intercepts), cnMountPoint, map[string]string{})) diff --git a/integration_test/testdata/k8s/echo-two.yaml b/integration_test/testdata/k8s/echo-two.yaml new file mode 100644 index 0000000000..6a530167b8 --- /dev/null +++ b/integration_test/testdata/k8s/echo-two.yaml @@ -0,0 +1,65 @@ +--- +apiVersion: v1 +kind: Service +metadata: + name: echo-one +spec: + type: ClusterIP + selector: + app: echo-both + ports: + - name: one + port: 80 + targetPort: echo-one +--- +apiVersion: v1 +kind: Service +metadata: + name: echo-two +spec: + type: ClusterIP + selector: + app: echo-both + ports: + - name: two + port: 80 + targetPort: echo-two +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: echo-both + labels: + app: echo-both +spec: + replicas: 1 + selector: + matchLabels: + app: echo-both + template: + metadata: + labels: + app: echo-both + spec: + containers: + - name: echo-one + image: jmalloc/echo-server + ports: + - name: echo-one + containerPort: 8080 + resources: + limits: + cpu: 50m + memory: 128Mi + - name: echo-two + image: jmalloc/echo-server + ports: + - name: echo-two + containerPort: 8081 + env: + - name: PORT + value: "8081" + resources: + limits: + cpu: 50m + memory: 128Mi From d406f9e2a511b7e2f133157b73657ebb0f24c21d Mon Sep 17 00:00:00 2001 From: Thomas Hallgren Date: Sat, 17 Aug 2024 12:24:26 +0200 Subject: [PATCH 2/2] Fix intermittent failure in agent_injector_test.go Rollouts caused modifications in the test setup. Signed-off-by: Thomas Hallgren --- cmd/traffic/cmd/manager/mutator/agent_injector_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cmd/traffic/cmd/manager/mutator/agent_injector_test.go b/cmd/traffic/cmd/manager/mutator/agent_injector_test.go index 9325542dd5..e72ef16f2f 100644 --- a/cmd/traffic/cmd/manager/mutator/agent_injector_test.go +++ b/cmd/traffic/cmd/manager/mutator/agent_injector_test.go @@ -795,6 +795,8 @@ func TestTrafficAgentConfigGenerator(t *testing.T) { for _, test := range tests { test := test // pin it + pod := test.request + cw.Blacklist(pod.Name, pod.Namespace) // prevent rollout agentmap.GeneratorConfigFunc = env.GeneratorConfig t.Run(test.name, func(t *testing.T) { runFunc(t, ctx, &test) @@ -1840,6 +1842,7 @@ func TestTrafficAgentInjector(t *testing.T) { cw := NewWatcher("") cw.Start(ctx) require.NoError(t, cw.StartWatchers(ctx)) + cw.Blacklist(test.pod.Name, test.pod.Namespace) var actualPatch PatchOps var actualErr error