Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

If user has manually unlocked USB drive, don't re-prompt for pasphrase #1462

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 41 additions & 7 deletions securedrop_client/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,23 @@ def __init__(self, status: str):


class ExportStatus(Enum):
"""
Status codes representing results of export- and print- related calls.

Codes may represent failure states (`USB_BAD_PASSPHRASE`) or success states
(`USB_CONNECTED`).

Warning: do not make changes to existing values without reviewing
`securedrop-export/securedrop_export/export.py`.
"""

# On the way to success
USB_CONNECTED = "USB_CONNECTED"
DISK_ENCRYPTED = "USB_ENCRYPTED"

# (Success) Drive is compatible and unlocked; do not prompt for passphrase
USB_ENCRYPTED_UNLOCKED = "USB_ENCRYPTED_UNLOCKED"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned in the PR, in the future we'll refine this to return multiple status codes (LUKS_UNLOCKED, VERACRYPT_UNLOCKED)

# Not too far from success
USB_NOT_CONNECTED = "USB_NOT_CONNECTED"
BAD_PASSPHRASE = "USB_BAD_PASSPHRASE"
Expand All @@ -37,6 +50,9 @@ class ExportStatus(Enum):
PRINTER_NOT_FOUND = "ERROR_PRINTER_NOT_FOUND"
MISSING_PRINTER_URI = "ERROR_MISSING_PRINTER_URI"

# Used internally
WARNING_QUBESOS_NOT_DETECTED = "WARNING_QUBESOS_NOT_DETECTED"


class Export(QObject):
"""
Expand Down Expand Up @@ -66,10 +82,15 @@ class Export(QObject):
DISK_ENCRYPTION_KEY_NAME = "encryption_key"
DISK_EXPORT_DIR = "export_data"

# Set of supported states for external disks that pass initial usb testing
# (meaning, they are attached and configured with a supported encryption and partition
# scheme, and may optionally be unlocked).
SUPPORTED_DISK_STATUSES = [ExportStatus.DISK_ENCRYPTED, ExportStatus.USB_ENCRYPTED_UNLOCKED]

# Set up signals for communication with the GUI thread
begin_preflight_check = pyqtSignal()
preflight_check_call_failure = pyqtSignal(object)
preflight_check_call_success = pyqtSignal()
preflight_check_call_success = pyqtSignal(str)
begin_usb_export = pyqtSignal(list, str)
export_usb_call_failure = pyqtSignal(object)
export_usb_call_success = pyqtSignal()
Expand Down Expand Up @@ -204,28 +225,35 @@ def _run_usb_test(self, archive_dir: str) -> None:
if status != ExportStatus.USB_CONNECTED.value:
raise ExportError(status)

def _run_disk_test(self, archive_dir: str) -> None:
def _run_disk_test(self, archive_dir: str) -> str:
"""
Run disk-test.

Args:
archive_dir (str): The path to the directory in which to create the archive.

Returns:
status (str): the status code resulting from the disk-test.

Raises:
ExportError: Raised if the usb-test does not return a DISK_ENCRYPTED status.
ExportError: Raised if the disk-test does not return a supported status
(Currently supported: DISK_ENCRYPTED, USB_ENCRYPTED_UNLOCKED).
"""
archive_path = self._create_archive(archive_dir, self.DISK_TEST_FN, self.DISK_TEST_METADATA)

status = self._export_archive(archive_path)
if status != ExportStatus.DISK_ENCRYPTED.value:
if status not in (item.value for item in Export.SUPPORTED_DISK_STATUSES):
raise ExportError(status)
return status

def _run_disk_export(self, archive_dir: str, filepaths: List[str], passphrase: str) -> None:
"""
Run disk-test.
Run disk-export.

Args:
archive_dir (str): The path to the directory in which to create the archive.
filepaths (List[str]): the list of files to add to the archive
passphrase (str): passphrase for the encrypted USB.

Raises:
ExportError: Raised if the usb-test does not return a DISK_ENCRYPTED status.
Expand Down Expand Up @@ -256,6 +284,8 @@ def _run_print(self, archive_dir: str, filepaths: List[str]) -> None:
def run_preflight_checks(self) -> None:
"""
Run preflight checks to verify that the usb device is connected and luks-encrypted.
Emits disk status (an ExportStatus enum, whose value describes the encryption type and
whether the drive is already unlocked).
"""
with TemporaryDirectory() as temp_dir:
try:
Expand All @@ -264,10 +294,14 @@ def run_preflight_checks(self) -> None:
threading.current_thread().ident
)
)

# Checks to see if USB is connected
self._run_usb_test(temp_dir)
self._run_disk_test(temp_dir)

# Checks whether disk state is supported
disk_status = self._run_disk_test(temp_dir)
logger.debug("completed preflight checks: success")
self.preflight_check_call_success.emit()
self.preflight_check_call_success.emit(disk_status)
except ExportError as e:
logger.debug("completed preflight checks: failure")
self.preflight_check_call_failure.emit(e)
Expand Down
27 changes: 20 additions & 7 deletions securedrop_client/gui/widgets.py
Original file line number Diff line number Diff line change
Expand Up @@ -2465,8 +2465,8 @@ def _print_file(self) -> None:
self.controller.print_file(self.file_uuid)
self.close()

@pyqtSlot()
def _on_preflight_success(self) -> None:
@pyqtSlot(str)
def _on_preflight_success(self, status: str = None) -> None:
# If the continue button is disabled then this is the result of a background preflight check
self.stop_animate_header()
self.header_icon.update_image("printer.svg", svg_size=QSize(64, 64))
Expand Down Expand Up @@ -2696,20 +2696,33 @@ def _export_file(self, checked: bool = False) -> None:
self.passphrase_field.setDisabled(True)
self.controller.export_file_to_usb_drive(self.file_uuid, self.passphrase_field.text())

@pyqtSlot()
def _on_preflight_success(self) -> None:
# If the continue button is disabled then this is the result of a background preflight check
@pyqtSlot(str)
def _on_preflight_success(self, status: str = None) -> None:
self.stop_animate_header()
self.header_icon.update_image("savetodisk.svg", QSize(64, 64))
self.header.setText(self.ready_header)

# Check if drive is unlocked. If so, skip passowrd prompt screen.
is_drive_unlocked = status == ExportStatus.USB_ENCRYPTED_UNLOCKED.value

# If the continue button is disabled then this is the result of a background preflight check
if not self.continue_button.isEnabled():
self.continue_button.clicked.disconnect()
self.continue_button.clicked.connect(self._show_passphrase_request_message)
if is_drive_unlocked:
self.continue_button.clicked.connect(self._export_file)
else:
self.continue_button.clicked.connect(self._show_passphrase_request_message)
self.continue_button.setEnabled(True)
self.continue_button.setFocus()
return

self._show_passphrase_request_message()
# If the continue button is already enabled, either export files or
# prompt for passphrase and export files, depending on disk status.
else:
if is_drive_unlocked:
self._export_file()
else:
self._show_passphrase_request_message()

@pyqtSlot(object)
def _on_preflight_failure(self, error: ExportError) -> None:
Expand Down
24 changes: 17 additions & 7 deletions securedrop_client/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
SendReplyJobTimeoutError,
)
from securedrop_client.crypto import GpgHelper
from securedrop_client.export import Export
from securedrop_client.export import Export, ExportStatus
from securedrop_client.queue import ApiJobQueue
from securedrop_client.sync import ApiSync
from securedrop_client.utils import check_dir_permissions
Expand Down Expand Up @@ -933,31 +933,39 @@ def run_printer_preflight_checks(self) -> None:
"""
Run preflight checks to make sure the Export VM is configured correctly.
"""
logger.info("Running printer preflight check")

if not self.qubes:
self.export.printer_preflight_success.emit()
logger.warning("QubesOS not detected, skipping printer preflight check")
self.export.printer_preflight_success.emit(
ExportStatus.WARNING_QUBESOS_NOT_DETECTED.value
)
return

logger.info("Running printer preflight check")
self.export.begin_printer_preflight.emit()

def run_export_preflight_checks(self) -> None:
"""
Run preflight checks to make sure the Export VM is configured correctly.
"""
logger.info("Running export preflight check")

if not self.qubes:
self.export.preflight_check_call_success.emit()
logger.warning("QubesOS not detected; skipping export preflight check")
self.export.preflight_check_call_success.emit(
ExportStatus.WARNING_QUBESOS_NOT_DETECTED.value
)
return

logger.info("Running export preflight check")
self.export.begin_preflight_check.emit()

def export_file_to_usb_drive(self, file_uuid: str, passphrase: str) -> None:
def export_file_to_usb_drive(self, file_uuid: str, passphrase: str = "") -> None:
"""
Send the file specified by file_uuid to the Export VM with the user-provided passphrase for
unlocking the attached transfer device. If the file is missing, update the db so that
is_downloaded is set to False.

If the drive is already unlocked, send an empty string as the passphrase.
"""
file = self.get_file(file_uuid)
file_location = file.location(self.data_dir)
Expand All @@ -967,7 +975,9 @@ def export_file_to_usb_drive(self, file_uuid: str, passphrase: str) -> None:
return

if not self.qubes:
self.export.export_usb_call_success.emit()
self.export.export_usb_call_success.emit(
ExportStatus.WARNING_QUBESOS_NOT_DETECTED.value
)
return

self.export.begin_usb_export.emit([file_location], passphrase)
Expand Down
26 changes: 26 additions & 0 deletions tests/gui/test_widgets.py
Original file line number Diff line number Diff line change
Expand Up @@ -3805,6 +3805,32 @@ def test_ExportDialog__on_preflight_success_when_continue_enabled(mocker, export
export_dialog._show_passphrase_request_message.assert_called_once_with()


def test_ExportDialog__on_preflight_success_drive_unlocked(mocker, export_dialog):
export_dialog._show_passphrase_request_message = mocker.MagicMock()
export_dialog.continue_button = mocker.MagicMock()
mocker.patch.object(export_dialog.continue_button, "isEnabled", return_value=False)
export_dialog.continue_button.clicked = mocker.MagicMock()

# Call with the signal that is emitted when a drive is already unlocked
export_dialog._on_preflight_success(status=ExportStatus.USB_ENCRYPTED_UNLOCKED.value)

export_dialog.continue_button.clicked.connect.assert_called_once_with(
export_dialog._export_file
)


def test_ExportDialog__on_preflight_success_drive_unlocked_continue_enabled(mocker, export_dialog):
export_dialog._show_passphrase_request_message = mocker.MagicMock()
export_dialog.continue_button.setEnabled(True)
mock_export_file = mocker.patch.object(export_dialog, "_export_file")

# Call with the signal that is emitted when a drive is already unlocked
export_dialog._on_preflight_success(status=ExportStatus.USB_ENCRYPTED_UNLOCKED.value)

export_dialog._show_passphrase_request_message.assert_not_called()
mock_export_file.assert_called_once_with()


def test_ExportDialog__on_preflight_success_enabled_after_preflight_success(mocker, export_dialog):
assert not export_dialog.continue_button.isEnabled()
export_dialog._on_preflight_success()
Expand Down
48 changes: 26 additions & 22 deletions tests/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import pytest

from securedrop_client.export import Export, ExportError
from securedrop_client.export import Export, ExportError, ExportStatus


def test_run_printer_preflight(mocker):
Expand Down Expand Up @@ -50,7 +50,7 @@ def test__run_printer_preflight(mocker):
"""
Ensure _export_archive and _create_archive are called with the expected parameters,
_export_archive is called with the return value of _create_archive, and
_run_disk_test returns without error if 'USB_CONNECTED' is the return value of _export_archive.
_run_usb_test returns without error if 'USB_CONNECTED' is the return value of _export_archive.
"""
export = Export()
export._create_archive = mocker.MagicMock(return_value="mock_archive_path")
Expand All @@ -66,7 +66,7 @@ def test__run_printer_preflight(mocker):

def test__run_printer_preflight_raises_ExportError_if_not_empty_string(mocker):
"""
Ensure ExportError is raised if _run_disk_test returns anything other than 'USB_CONNECTED'.
Ensure ExportError is raised if _run_usb_test returns anything other than 'USB_CONNECTED'.
"""
export = Export()
export._create_archive = mocker.MagicMock(return_value="mock_archive_path")
Expand Down Expand Up @@ -199,7 +199,8 @@ def test_send_file_to_usb_device_error(mocker):
export.export_completed.emit.assert_called_once_with(["path1", "path2"])


def test_run_preflight_checks(mocker):
@pytest.mark.parametrize("supported_status", (i.value for i in Export.SUPPORTED_DISK_STATUSES))
def test_run_preflight_checks(mocker, supported_status):
"""
Ensure TemporaryDirectory is used when creating and sending the archives during the preflight
checks and that the success signal is emitted by Export.
Expand All @@ -210,14 +211,14 @@ def test_run_preflight_checks(mocker):
export = Export()
export.preflight_check_call_success = mocker.MagicMock()
export.preflight_check_call_success.emit = mocker.MagicMock()
_run_usb_export = mocker.patch.object(export, "_run_usb_test")
_run_disk_export = mocker.patch.object(export, "_run_disk_test")
_run_usb_test = mocker.patch.object(export, "_run_usb_test")
_run_disk_test = mocker.patch.object(export, "_run_disk_test", return_value=supported_status)

export.run_preflight_checks()

_run_usb_export.assert_called_once_with("mock_temp_dir")
_run_disk_export.assert_called_once_with("mock_temp_dir")
export.preflight_check_call_success.emit.assert_called_once_with()
_run_usb_test.assert_called_once_with("mock_temp_dir")
_run_disk_test.assert_called_once_with("mock_temp_dir")
export.preflight_check_call_success.emit.assert_called_once_with(supported_status)


def test_run_preflight_checks_error(mocker):
Expand All @@ -232,13 +233,13 @@ def test_run_preflight_checks_error(mocker):
export.preflight_check_call_failure = mocker.MagicMock()
export.preflight_check_call_failure.emit = mocker.MagicMock()
error = ExportError("bang!")
_run_usb_export = mocker.patch.object(export, "_run_usb_test")
_run_disk_export = mocker.patch.object(export, "_run_disk_test", side_effect=error)
_run_usb_test = mocker.patch.object(export, "_run_usb_test")
_run_disk_test = mocker.patch.object(export, "_run_disk_test", side_effect=error)

export.run_preflight_checks()

_run_usb_export.assert_called_once_with("mock_temp_dir")
_run_disk_export.assert_called_once_with("mock_temp_dir")
_run_usb_test.assert_called_once_with("mock_temp_dir")
_run_disk_test.assert_called_once_with("mock_temp_dir")
export.preflight_check_call_failure.emit.assert_called_once_with(error)


Expand All @@ -263,27 +264,30 @@ def test__run_disk_export(mocker):
)


def test__run_disk_export_raises_ExportError_if_not_empty_string(mocker):
def test__run_disk_export_raises_ExportError_if_not_supported_status(mocker):
"""
Ensure ExportError is raised if _run_disk_test returns anything other than ''.
Ensure ExportError is raised if _run_disk_test returns anything other than a
supported status.
"""
export = Export()
export._create_archive = mocker.MagicMock(return_value="mock_archive_path")
export._export_archive = mocker.MagicMock(return_value="SOMETHING_OTHER_THAN_EMPTY_STRING")
export._export_archive = mocker.MagicMock(return_value="DEFINITELY_NOT_SUPPORTED_DISK_STATUS")

with pytest.raises(ExportError):
export._run_disk_export("mock_archive_dir", ["mock_filepath"], "mock_passphrase")


def test__run_disk_test(mocker):
# We have to sort the list or pytest complains
@pytest.mark.parametrize("supported_status", (i.value for i in Export.SUPPORTED_DISK_STATUSES))
def test__run_disk_test(mocker, supported_status):
"""
Ensure _export_archive and _create_archive are called with the expected parameters,
_export_archive is called with the return value of _create_archive, and
_run_disk_test returns without error if 'USB_ENCRYPTED' is the ouput status of _export_archive.
_run_disk_test returns without error if a supported status is the ouput status of _export_archive.
"""
export = Export()
export._create_archive = mocker.MagicMock(return_value="mock_archive_path")
export._export_archive = mocker.MagicMock(return_value="USB_ENCRYPTED")
export._export_archive = mocker.MagicMock(return_value=supported_status)

export._run_disk_test("mock_archive_dir")

Expand All @@ -293,13 +297,13 @@ def test__run_disk_test(mocker):
)


def test__run_disk_test_raises_ExportError_if_not_USB_ENCRYPTED(mocker):
def test__run_disk_test_raises_ExportError_if_not_supported_status(mocker):
"""
Ensure ExportError is raised if _run_disk_test returns anything other than 'USB_ENCRYPTED'.
Ensure ExportError is raised if _run_disk_test returns an unsupported status.
"""
export = Export()
export._create_archive = mocker.MagicMock(return_value="mock_archive_path")
export._export_archive = mocker.MagicMock(return_value="SOMETHING_OTHER_THAN_USB_ENCRYPTED")
export._export_archive = mocker.MagicMock(return_value="DEFINITELY_NOT_SUPPORTED_DISK_STATUS")

with pytest.raises(ExportError):
export._run_disk_test("mock_archive_dir")
Expand Down