Skip to content
This repository has been archived by the owner on Jan 5, 2024. It is now read-only.

Commit

Permalink
Merge pull request #105 from rocodes/refactor-cli
Browse files Browse the repository at this point in the history
Refactor: CLI wrapper, export and print services, single entrypoint
  • Loading branch information
gonzalo-bulnes authored Jan 12, 2023
2 parents 4fe96bf + d488374 commit d03bf57
Show file tree
Hide file tree
Showing 35 changed files with 3,079 additions and 1,307 deletions.
4 changes: 2 additions & 2 deletions .semgrep/custom-rules.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ rules:
languages:
- python
severity: ERROR
message: Possible path traversal or insecure directory and file permissions through os.mkdir(). Use securedrop_export.utils.safe_mkdir instead.
message: Possible path traversal or insecure directory and file permissions through os.mkdir(). Use securedrop_export.directory.safe_mkdir instead.
patterns:
- pattern: "....mkdir(...)"
- pattern-not-inside: |
Expand All @@ -58,7 +58,7 @@ rules:
languages:
- python
severity: ERROR
message: Possible path traversal or insecure directory and file permissions through os.makedirs(). Use securedrop_export.utils.safe_mkdir instead.
message: Possible path traversal or insecure directory and file permissions through os.makedirs(). Use securedrop_export.directory.safe_mkdir instead.
patterns:
- pattern: "....makedirs(...)"
- pattern-not-inside: |
Expand Down
101 changes: 101 additions & 0 deletions securedrop_export/archive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#!/usr/bin/env python3

import datetime
import json
import logging
import os
import tempfile

from securedrop_export.exceptions import ExportException
from securedrop_export.status import BaseStatus
from securedrop_export.command import Command
from securedrop_export.directory import safe_extractall

logger = logging.getLogger(__name__)


class Status(BaseStatus):
ERROR_ARCHIVE_METADATA = "ERROR_ARCHIVE_METADATA"
ERROR_METADATA_PARSING = "ERROR_METADATA_PARSING"
ERROR_EXTRACTION = "ERROR_EXTRACTION"


class Metadata(object):
"""
Object to parse, validate and store json metadata from the sd-export archive.
"""

METADATA_FILE = "metadata.json"
SUPPORTED_ENCRYPTION_METHODS = ["luks"]

def __init__(self, archive_path: str):
self.metadata_path = os.path.join(archive_path, self.METADATA_FILE)

def validate(self) -> "Metadata":
# Read metadata json and set relevant attributes
try:
with open(self.metadata_path) as f:
logger.info("Parsing archive metadata")
json_config = json.loads(f.read())
self.export_method = json_config.get("device", None)
self.encryption_method = json_config.get("encryption_method", None)
self.encryption_key = json_config.get("encryption_key", None)
logger.info(
"Target: {}, encryption_method {}".format(
self.export_method, self.encryption_method
)
)

except Exception as ex:
logger.error("Metadata parsing failure")
raise ExportException(sdstatus=Status.ERROR_METADATA_PARSING) from ex

# Validate action - fails if command is not in list of supported commands
try:
logger.debug("Validate export action")
self.command = Command(self.export_method)
if (
self.command is Command.EXPORT
and self.encryption_method not in self.SUPPORTED_ENCRYPTION_METHODS
):
logger.error("Unsupported encryption method")
raise ExportException(sdstatus=Status.ERROR_ARCHIVE_METADATA)
except ValueError as v:
raise ExportException(sdstatus=Status.ERROR_ARCHIVE_METADATA) from v

return self


class Archive(object):
def __init__(self, archive_path: str):
os.umask(0o077)
self.archive = archive_path
self.target_dirname = "sd-export-{}".format(
datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
)
self.tmpdir = tempfile.mkdtemp()

def extract_tarball(self) -> "Archive":
"""
Extract tarball, checking for path traversal, and return Archive object.
"""
try:
logger.info(
"Extracting tarball {} into {}".format(self.archive, self.tmpdir)
)
safe_extractall(self.archive, self.tmpdir)
return self
except Exception as ex:
logger.error("Unable to extract tarball: {}".format(ex))
raise ExportException(sdstatus=Status.ERROR_EXTRACTION) from ex

def set_metadata(self, metadata: Metadata) -> "Archive":
"""
Set relevant metadata attributes for a given archive.
"""
self.command = metadata.command
if self.command is Command.EXPORT:
# When we support multiple encryption types, we will also want to add the
# encryption_method here
self.encryption_key = metadata.encryption_key
return self
18 changes: 18 additions & 0 deletions securedrop_export/command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from enum import Enum


class Command(Enum):
"""
All supported commands.
Values are as supplied by the calling VM (sd-app), and a change in any values requires
corresponding changes in the calling VM.
"""

PRINTER_PREFLIGHT = "printer-preflight"
PRINTER_TEST = "printer-test"
PRINT = "printer"
CHECK_USBS = "usb-test"
CHECK_VOLUME = "disk-test"
EXPORT = "disk"
START_VM = ""
22 changes: 11 additions & 11 deletions securedrop_export/utils.py → securedrop_export/directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ def safe_mkdir(
if not base_path.is_absolute():
raise ValueError(f"Base directory '{base_path}' must be an absolute path")

check_path_traversal(base_path)
_check_path_traversal(base_path)

if relative_path:
check_path_traversal(relative_path)
_check_path_traversal(relative_path)
full_path = base_path.joinpath(relative_path)
else:
full_path = base_path
Expand All @@ -35,7 +35,7 @@ def safe_mkdir(
#
# Note: We do not use parents=True because the parent directories will not be created with the
# specified mode. Parents are created using system default permissions, which we modify to be
# 700 via os.umask in the SDExport contructor. Creating directories one-by-one with mode=0o0700
# 700 via os.umask in the Archive contructor. Creating directories one-by-one with mode=0o0700
# is not necessary but adds defense in depth.
relative_path = relative_filepath(full_path, base_path)
for parent in reversed(relative_path.parents):
Expand All @@ -45,7 +45,7 @@ def safe_mkdir(
full_path.mkdir(mode=0o0700, exist_ok=True)

# Check permissions after creating the directories
check_all_permissions(relative_path, base_path)
_check_all_permissions(relative_path, base_path)


def safe_extractall(archive_file_path: str, dest_path: str) -> None:
Expand All @@ -65,14 +65,14 @@ def safe_extractall(archive_file_path: str, dest_path: str) -> None:
for file_info in tar.getmembers():
file_info.mode = 0o700 if file_info.isdir() else 0o600

check_path_traversal(file_info.name)
_check_path_traversal(file_info.name)

# If the path is relative then we don't need to check that it resolves to dest_path
if Path(file_info.name).is_absolute():
relative_filepath(file_info.name, dest_path)

if file_info.islnk() or file_info.issym():
check_path_traversal(file_info.linkname)
_check_path_traversal(file_info.linkname)
# If the path is relative then we don't need to check that it resolves to dest_path
if Path(file_info.linkname).is_absolute():
relative_filepath(file_info.linkname, dest_path)
Expand All @@ -92,7 +92,7 @@ def relative_filepath(filepath: Union[str, Path], base_dir: Union[str, Path]) ->
return Path(filepath).resolve().relative_to(base_dir)


def check_path_traversal(filename_or_filepath: Union[str, Path]) -> None:
def _check_path_traversal(filename_or_filepath: Union[str, Path]) -> None:
"""
Raise ValueError if filename_or_filepath does any path traversal. This works on filenames,
relative paths, and absolute paths.
Expand Down Expand Up @@ -121,7 +121,7 @@ def check_path_traversal(filename_or_filepath: Union[str, Path]) -> None:
raise ValueError(f"Unsafe file or directory name: '{filename_or_filepath}'")


def check_all_permissions(path: Union[str, Path], base_path: Union[str, Path]) -> None:
def _check_all_permissions(path: Union[str, Path], base_path: Union[str, Path]) -> None:
"""
Check that the permissions of each directory between base_path and path are set to 700.
"""
Expand All @@ -131,16 +131,16 @@ def check_all_permissions(path: Union[str, Path], base_path: Union[str, Path]) -
return

Path(full_path).chmod(0o700)
check_dir_permissions(full_path)
_check_dir_permissions(full_path)

relative_path = relative_filepath(full_path, base_path)
for parent in relative_path.parents:
full_path = base_path.joinpath(parent)
Path(full_path).chmod(0o700)
check_dir_permissions(str(full_path))
_check_dir_permissions(str(full_path))


def check_dir_permissions(dir_path: Union[str, Path]) -> None:
def _check_dir_permissions(dir_path: Union[str, Path]) -> None:
"""
Check that a directory has ``700`` as the final 3 bytes. Raises a ``RuntimeError`` otherwise.
"""
Expand Down
1 change: 1 addition & 0 deletions securedrop_export/disk/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .service import Service # noqa: F401
Loading

0 comments on commit d03bf57

Please sign in to comment.