Skip to content

Commit

Permalink
Use filestatus for sOPS 3.9.0+ to determine whether a file that canno…
Browse files Browse the repository at this point in the history
…t be decrypted is unencrypted.
  • Loading branch information
felixfontein committed Jul 4, 2024
1 parent 5ed7954 commit 944b5a9
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 8 deletions.
1 change: 1 addition & 0 deletions changelogs/fragments/190-sops-3.9.0.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
minor_changes:
- "Detect SOPS 3.9.0 and use new ``decrypt`` and ``encrypt`` subcommands (https://github.com/ansible-collections/community.sops/pull/190)."
- "sops vars plugin - new option ``handle_unencrypted_files`` allows to control behavior when encountering unencrypted files with SOPS 3.9.0+ (https://github.com/ansible-collections/community.sops/pull/190)."
bugfixes:
- "sops_encrypt - properly support ``path_regex`` in ``.sops.yaml`` when SOPS 3.9.0 or later is used (https://github.com/ansible-collections/community.sops/issues/153, https://github.com/ansible-collections/community.sops/pull/190)."
38 changes: 35 additions & 3 deletions plugins/module_utils/sops.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
__metaclass__ = type


import collections
import json
import os
import re

Expand Down Expand Up @@ -117,17 +119,22 @@ def f(value, arguments, env, version):
class SopsError(Exception):
''' Extend Exception class with sops specific information '''

def __init__(self, filename, exit_code, message, decryption=True):
def __init__(self, filename, exit_code, message, decryption=True, operation=None):
if operation is None:
operation = 'decrypt' if decryption else 'encrypt'
if exit_code in SOPS_ERROR_CODES:
exception_name = SOPS_ERROR_CODES[exit_code]
message = "error with file %s: %s exited with code %d: %s" % (
filename, exception_name, exit_code, to_native(message))
else:
message = "could not %s file %s; Unknown sops error code: %s; message: %s" % (
'decrypt' if decryption else 'encrypt', filename, exit_code, to_native(message))
operation, filename, exit_code, to_native(message))
super(SopsError, self).__init__(message)


SopsFileStatus = collections.namedtuple('SopsFileStatus', ['encrypted'])


class SopsRunner(object):
def _add_options(self, command, env, get_option_value, options):
if get_option_value is None:
Expand Down Expand Up @@ -155,12 +162,14 @@ def __init__(self, binary, module=None, display=None):
self.display = display

self.version = (3, 7, 3) # if --disable-version-check is not supported, this is version 3.7.3 or older
self.version_string = '(before 3.8.0)'

exit_code, output, err = self._run_command([self.binary, '--version', '--disable-version-check'])
if exit_code == 0:
m = _SOPS_VERSION.match(output.decode('utf-8'))
if m:
self.version = int(m.group(1)), int(m.group(2)), int(m.group(3))
self.version_string = '%d.%d.%d' % self.version
self._debug('SOPS version detected as %s' % (self.version, ))
else:
self._warn('Cannot extract SOPS version from: %s' % repr(output))
Expand Down Expand Up @@ -235,14 +244,37 @@ def encrypt(self, data, cwd=None, input_type=None, output_type=None, filename=No

# sops logs always to stderr, as stdout is used for
# file content
if err and self.display:
if err:
self._debug(u'Unexpected stderr:\n' + to_text(err, errors='surrogate_or_strict'))

if exit_code != 0:
raise SopsError('to stdout', exit_code, err, decryption=False)

return output

def has_filestatus(self):
return self.version >= (3, 9, 0)

def get_filestatus(self, path):
command = [self.binary, 'filestatus', path]

exit_code, output, err = self._run_command(command)

# sops logs always to stderr, as stdout is used for
# file content
if err:
self._debug(u'Unexpected stderr:\n' + to_text(err, errors='surrogate_or_strict'))

if exit_code != 0:
raise SopsError(path, exit_code, err, operation='inspect')

try:
result = json.loads(output)
return SopsFileStatus(result['encrypted'])
except Exception as exc:
self._debug(u'Unexpected stdout:\n' + to_text(output, errors='surrogate_or_strict'))
raise SopsError(path, 0, 'Cannot decode filestatus result: %s' % exc, operation='inspect')


_SOPS_RUNNER_CACHE = dict()

Expand Down
65 changes: 60 additions & 5 deletions plugins/vars/sops.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
description:
- Check all of these extensions when looking for 'variable' files.
- These files must be SOPS encrypted YAML or JSON files.
The plugin will produce errors when encountering files matching these extensions that are not SOPS encrypted.
(This might change in a future version.)
- By default the plugin will produce errors when encountering files matching these extensions that are not SOPS encrypted.
This behavior can be controlled with the O(handle_unencrypted_files) option.
type: list
elements: string
ini:
Expand Down Expand Up @@ -68,6 +68,27 @@
version_added: 1.3.0
env:
- name: SOPS_ANSIBLE_AWX_DISABLE_VARS_PLUGIN_TEMPORARILY
handle_unencrypted_files:
description:
- How to handle files that match the extensions in O(_valid_extensions) that are not SOPS encrypted.
- The default value V(error) will produce an error.
- The value V(skip) will simply skip these files. This requires SOPS 3.9.0 or later.
- The value V(warn) will skip these files and emit a warning. This requires SOPS 3.9.0 or later.
- B(Note) that this will not help if the store SOPS uses cannot parse the file, for example because it is
no valid JSON/YAML/... file despite its file extension. For extensions other than the default ones SOPS
uses the binary store, which tries to parse the file as JSON.
type: string
choices:
- skip
- warn
- error
default: error
version_added: 1.8.0
ini:
- key: handle_unencrypted_files
section: community.sops
env:
- name: ANSIBLE_VARS_SOPS_PLUGIN_HANDLE_UNENCRYPTED_FILES
extends_documentation_fragment:
- ansible.builtin.vars_plugin_staging
- community.sops.sops
Expand All @@ -90,7 +111,7 @@
from ansible.inventory.host import Host
from ansible.inventory.group import Group
from ansible.utils.vars import combine_vars
from ansible_collections.community.sops.plugins.module_utils.sops import Sops
from ansible_collections.community.sops.plugins.module_utils.sops import Sops, SopsError

from ansible.utils.display import Display
display = Display()
Expand Down Expand Up @@ -120,6 +141,7 @@ def get_option_value(argument_name):
return {}

valid_extensions = self.get_option('_valid_extensions')
handle_unencrypted_files = self.get_option('handle_unencrypted_files')

data = {}
for entity in entities:
Expand Down Expand Up @@ -162,13 +184,46 @@ def get_option_value(argument_name):
if cache and found in DECRYPTED:
file_content = DECRYPTED[found]
else:
file_content = Sops.decrypt(found, display=display, get_option_value=get_option_value)
sops_runner = Sops.get_sops_runner_from_options(get_option_value, display=display)
if handle_unencrypted_files != 'error' and not sops_runner.has_filestatus():
raise AnsibleParserError(
'Cannot use handle_unencrypted_files=%s with SOPS %s' % (handle_unencrypted_files, sops_runner.version_string)
)
try:
file_content = sops_runner.decrypt(found, get_option_value=get_option_value)
except SopsError as exc:
skip = False
if sops_runner.has_filestatus():
# Check whether sops thinks the file might be encrypted. If it thinks it is not,
# skip it. Otherwise, re-raise the original error
try:
file_status = sops_runner.get_filestatus(found)
if not file_status.encrypted:
if handle_unencrypted_files == 'skip':
self._display.vvvv("SOPS vars plugin: skipping unencrypted file %s" % found)
skip = True
elif handle_unencrypted_files == 'warn':
self._display.warning("SOPS vars plugin: skipping unencrypted file %s" % found)
skip = True
elif handle_unencrypted_files == 'error':
raise AnsibleParserError("SOPS vars plugin: file %s is not encrypted" % found)
except SopsError as status_exc:
# The filestatus operation can fail for example if sops cannot parse the file
# as JSON/YAML. In that case, also re-raise the original error
self._display.warning("SOPS vars plugin: cannot obtain file status of %s: %s" % (found, status_exc))
if skip:
continue
raise
DECRYPTED[found] = file_content
new_data = loader.load(file_content)
if new_data: # ignore empty files
data = combine_vars(data, new_data)

except Exception as e:
except AnsibleParserError:
raise
except SopsError as e:
raise AnsibleParserError(to_native(e))
except Exception as e:
raise AnsibleParserError('Unexpected error in the SOPS vars plugin: %s' % to_native(e))

return data

0 comments on commit 944b5a9

Please sign in to comment.