From 2fc8b5f87209556abc7d7c6ebf18cdaba4143325 Mon Sep 17 00:00:00 2001 From: "Santiago M. Mola" Date: Tue, 23 Jan 2024 15:29:52 +0100 Subject: [PATCH] Refine shell tracing tests (#2028) * Make initial asserts more explicit. * Assert `resource`. * Align with RFC --- tests/appsec/test_shell_execution.py | 77 ++++++++++++++-------------- 1 file changed, 38 insertions(+), 39 deletions(-) diff --git a/tests/appsec/test_shell_execution.py b/tests/appsec/test_shell_execution.py index 8117876a75..206b871cd6 100644 --- a/tests/appsec/test_shell_execution.py +++ b/tests/appsec/test_shell_execution.py @@ -2,11 +2,10 @@ # This product includes software developed at Datadog (https://www.datadoghq.com/). # Copyright 2021 Datadog, Inc. -import json -from utils import coverage, interfaces, weblog, features, irrelevant -from utils._context.core import context +from utils import bug, context, coverage, interfaces, weblog, features, irrelevant, rfc +@rfc("https://docs.google.com/document/d/1YYxOB1nM032H-lgXrVml9mukMhF4eHVIzyK9H_PvrSY/edit#heading=h.o5gstqo08gu5") @features.appsec_shell_execution_tracing @coverage.basic class Test_ShellExecution: @@ -14,22 +13,25 @@ class Test_ShellExecution: @staticmethod def fetch_command_execution_span(r): - for _, trace in interfaces.library.get_traces(request=r): - for span in trace: - if span["name"] == "command_execution": - command_exec_span = span - assert r.status_code == 200 - assert command_exec_span is not None, f"command execution span hasn't be found for {r.url}" - assert command_exec_span["name"] == "command_execution" - assert command_exec_span["meta"]["component"] == "subprocess" - return command_exec_span + + traces = [t for _, t in interfaces.library.get_traces(request=r)] + assert traces, "No traces found" + assert len(traces) == 1 + spans = traces[0] + spans = [s for s in spans if s["name"] == "command_execution"] + assert spans, "No command_execution span found" + assert len(spans) == 1, "More than one command_execution span found" + + span = spans[0] + assert span["name"] == "command_execution" + assert span["type"] == "system" + assert span["meta"]["component"] == "subprocess" + return span def setup_track_cmd_exec(self): self.r_cmd_exec = weblog.post( - "/shell_execution", - headers={"Content-Type": "application/json"}, - data=json.dumps({"command": "echo", "options": {"shell": False}, "args": "foo"}), + "/shell_execution", json={"command": "echo", "options": {"shell": False}, "args": "foo"}, ) @irrelevant( @@ -37,30 +39,28 @@ def setup_track_cmd_exec(self): reason="For PHP 7.4+", ) def test_track_cmd_exec(self): - shell_exec_span = self.fetch_command_execution_span(self.r_cmd_exec) - assert shell_exec_span["meta"]["cmd.exec"] == '["echo","foo"]' - assert shell_exec_span["meta"]["cmd.exit_code"] == "0" + span = self.fetch_command_execution_span(self.r_cmd_exec) + assert span["resource"] == "echo" + assert span["meta"]["cmd.exec"] == '["echo","foo"]' + assert span["meta"]["cmd.exit_code"] == "0" def setup_track_shell_exec(self): self.r_shell_exec = weblog.post( - "/shell_execution", - headers={"Content-Type": "application/json"}, - data=json.dumps({"command": "echo", "options": {"shell": True}, "args": "foo"}), + "/shell_execution", json={"command": "echo", "options": {"shell": True}, "args": "foo"}, ) @irrelevant(library="java", reason="No method for shell execution in Java") + @bug(library="nodejs", reason="resource name handling is inconsistent with the RFC") def test_track_shell_exec(self): - shell_exec_span = self.fetch_command_execution_span(self.r_shell_exec) - - assert shell_exec_span["meta"]["cmd.shell"] == "echo foo" - assert shell_exec_span["meta"]["cmd.exit_code"] == "0" + span = self.fetch_command_execution_span(self.r_shell_exec) + assert span["resource"] == "sh" + assert span["meta"]["cmd.shell"] == "echo foo" + assert span["meta"]["cmd.exit_code"] == "0" def setup_truncated(self): args = ["a" * 4096, "arg"] self.r_truncation = weblog.post( - "/shell_execution", - headers={"Content-Type": "application/json"}, - data=json.dumps({"command": "echo", "options": {"shell": False}, "args": args}), + "/shell_execution", json={"command": "echo", "options": {"shell": False}, "args": args}, ) @irrelevant( @@ -68,17 +68,16 @@ def setup_truncated(self): reason="For PHP 7.4+", ) def test_truncated(self): - shell_exec_span = self.fetch_command_execution_span(self.r_truncation) - assert shell_exec_span["meta"]["cmd.exec"] == '["echo","' + "a" * 4092 + '",""]' - assert shell_exec_span["meta"]["cmd.truncated"] == "true" - assert shell_exec_span["meta"]["cmd.exit_code"] == "0" + span = self.fetch_command_execution_span(self.r_truncation) + assert span["resource"] == "echo" + assert span["meta"]["cmd.exec"] == '["echo","' + "a" * 4092 + '",""]' + assert span["meta"]["cmd.truncated"] == "true" + assert span["meta"]["cmd.exit_code"] == "0" def setup_obfuscation(self): args = "password 1234" self.r_obfuscation = weblog.post( - "/shell_execution", - headers={"Content-Type": "application/json"}, - data=json.dumps({"command": "echo", "options": {"shell": False}, "args": args}), + "/shell_execution", json={"command": "echo", "options": {"shell": False}, "args": args}, ) @irrelevant( @@ -86,7 +85,7 @@ def setup_obfuscation(self): reason="For PHP 7.4+", ) def test_obfuscation(self): - shell_exec_span = self.fetch_command_execution_span(self.r_obfuscation) - - assert shell_exec_span["meta"]["cmd.exec"] == '["echo","password","?"]' - assert shell_exec_span["meta"]["cmd.exit_code"] == "0" + span = self.fetch_command_execution_span(self.r_obfuscation) + assert span["resource"] == "echo" + assert span["meta"]["cmd.exec"] == '["echo","password","?"]' + assert span["meta"]["cmd.exit_code"] == "0"