Skip to content

Commit

Permalink
Merge branch 'main' into emmett.butler/http-header-tags-server-support
Browse files Browse the repository at this point in the history
  • Loading branch information
emmettbutler authored Jan 23, 2024
2 parents 114af48 + 2fc8b5f commit 7f12215
Showing 1 changed file with 38 additions and 39 deletions.
77 changes: 38 additions & 39 deletions tests/appsec/test_shell_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,91 +2,90 @@
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2021 Datadog, Inc.

import json
from utils import coverage, interfaces, weblog, features, irrelevant
from utils._context.core import context
from utils import bug, context, coverage, interfaces, weblog, features, irrelevant, rfc


@rfc("https://docs.google.com/document/d/1YYxOB1nM032H-lgXrVml9mukMhF4eHVIzyK9H_PvrSY/edit#heading=h.o5gstqo08gu5")
@features.appsec_shell_execution_tracing
@coverage.basic
class Test_ShellExecution:
"""Test shell execution tracing"""

@staticmethod
def fetch_command_execution_span(r):
for _, trace in interfaces.library.get_traces(request=r):
for span in trace:
if span["name"] == "command_execution":
command_exec_span = span

assert r.status_code == 200
assert command_exec_span is not None, f"command execution span hasn't be found for {r.url}"
assert command_exec_span["name"] == "command_execution"
assert command_exec_span["meta"]["component"] == "subprocess"
return command_exec_span

traces = [t for _, t in interfaces.library.get_traces(request=r)]
assert traces, "No traces found"
assert len(traces) == 1
spans = traces[0]
spans = [s for s in spans if s["name"] == "command_execution"]
assert spans, "No command_execution span found"
assert len(spans) == 1, "More than one command_execution span found"

span = spans[0]
assert span["name"] == "command_execution"
assert span["type"] == "system"
assert span["meta"]["component"] == "subprocess"
return span

def setup_track_cmd_exec(self):
self.r_cmd_exec = weblog.post(
"/shell_execution",
headers={"Content-Type": "application/json"},
data=json.dumps({"command": "echo", "options": {"shell": False}, "args": "foo"}),
"/shell_execution", json={"command": "echo", "options": {"shell": False}, "args": "foo"},
)

@irrelevant(
context.library == "php" and "-7." in context.weblog_variant and "7.4" not in context.weblog_variant,
reason="For PHP 7.4+",
)
def test_track_cmd_exec(self):
shell_exec_span = self.fetch_command_execution_span(self.r_cmd_exec)
assert shell_exec_span["meta"]["cmd.exec"] == '["echo","foo"]'
assert shell_exec_span["meta"]["cmd.exit_code"] == "0"
span = self.fetch_command_execution_span(self.r_cmd_exec)
assert span["resource"] == "echo"
assert span["meta"]["cmd.exec"] == '["echo","foo"]'
assert span["meta"]["cmd.exit_code"] == "0"

def setup_track_shell_exec(self):
self.r_shell_exec = weblog.post(
"/shell_execution",
headers={"Content-Type": "application/json"},
data=json.dumps({"command": "echo", "options": {"shell": True}, "args": "foo"}),
"/shell_execution", json={"command": "echo", "options": {"shell": True}, "args": "foo"},
)

@irrelevant(library="java", reason="No method for shell execution in Java")
@bug(library="nodejs", reason="resource name handling is inconsistent with the RFC")
def test_track_shell_exec(self):
shell_exec_span = self.fetch_command_execution_span(self.r_shell_exec)

assert shell_exec_span["meta"]["cmd.shell"] == "echo foo"
assert shell_exec_span["meta"]["cmd.exit_code"] == "0"
span = self.fetch_command_execution_span(self.r_shell_exec)
assert span["resource"] == "sh"
assert span["meta"]["cmd.shell"] == "echo foo"
assert span["meta"]["cmd.exit_code"] == "0"

def setup_truncated(self):
args = ["a" * 4096, "arg"]
self.r_truncation = weblog.post(
"/shell_execution",
headers={"Content-Type": "application/json"},
data=json.dumps({"command": "echo", "options": {"shell": False}, "args": args}),
"/shell_execution", json={"command": "echo", "options": {"shell": False}, "args": args},
)

@irrelevant(
context.library == "php" and "-7." in context.weblog_variant and "7.4" not in context.weblog_variant,
reason="For PHP 7.4+",
)
def test_truncated(self):
shell_exec_span = self.fetch_command_execution_span(self.r_truncation)
assert shell_exec_span["meta"]["cmd.exec"] == '["echo","' + "a" * 4092 + '",""]'
assert shell_exec_span["meta"]["cmd.truncated"] == "true"
assert shell_exec_span["meta"]["cmd.exit_code"] == "0"
span = self.fetch_command_execution_span(self.r_truncation)
assert span["resource"] == "echo"
assert span["meta"]["cmd.exec"] == '["echo","' + "a" * 4092 + '",""]'
assert span["meta"]["cmd.truncated"] == "true"
assert span["meta"]["cmd.exit_code"] == "0"

def setup_obfuscation(self):
args = "password 1234"
self.r_obfuscation = weblog.post(
"/shell_execution",
headers={"Content-Type": "application/json"},
data=json.dumps({"command": "echo", "options": {"shell": False}, "args": args}),
"/shell_execution", json={"command": "echo", "options": {"shell": False}, "args": args},
)

@irrelevant(
context.library == "php" and "-7." in context.weblog_variant and "7.4" not in context.weblog_variant,
reason="For PHP 7.4+",
)
def test_obfuscation(self):
shell_exec_span = self.fetch_command_execution_span(self.r_obfuscation)

assert shell_exec_span["meta"]["cmd.exec"] == '["echo","password","?"]'
assert shell_exec_span["meta"]["cmd.exit_code"] == "0"
span = self.fetch_command_execution_span(self.r_obfuscation)
assert span["resource"] == "echo"
assert span["meta"]["cmd.exec"] == '["echo","password","?"]'
assert span["meta"]["cmd.exit_code"] == "0"

0 comments on commit 7f12215

Please sign in to comment.