Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce 4 new methods for the CredentialManager and deprecate some old ones #189

Merged
merged 2 commits into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
repos:
- repo: https://github.com/asottile/pyupgrade
rev: v2.31.1
rev: v2.37.3
hooks:
- id: pyupgrade
args: [--py37-plus]
- repo: https://github.com/psf/black
rev: 22.3.0
rev: 22.6.0
hooks:
- id: black
- repo: https://github.com/PyCQA/isort
rev: 5.10.1
hooks:
- id: isort
- repo: https://github.com/PyCQA/flake8
rev: 4.0.1
rev: 5.0.4
hooks:
- id: flake8
exclude: ^tests
- repo: https://github.com/myint/docformatter
rev: v1.4
rev: v1.5.0
hooks:
- id: docformatter
args: [--in-place]
95 changes: 73 additions & 22 deletions solnlib/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

import json
import re
import warnings
from typing import Dict, List

from splunklib import binding
from splunklib import binding, client

from . import splunk_rest_client as rest_client
from .net_utils import validate_scheme_host_port
Expand Down Expand Up @@ -79,7 +81,7 @@ def __init__(
port: int = None,
**context: dict,
):
"""Initializes CredentialsManager.
"""Initializes CredentialManager.

Arguments:
session_key: Splunk access token.
Expand Down Expand Up @@ -123,9 +125,11 @@ def get_password(self, user: str) -> str:
realm='realm_test')
>>> cm.get_password('testuser2')
"""

all_passwords = self._get_all_passwords()
for password in all_passwords:
if self._realm is not None:
passwords = self.get_clear_passwords_in_realm()
else:
passwords = self.get_clear_passwords()
for password in passwords:
if password["username"] == user and password["realm"] == self._realm:
return password["clear_password"]

Expand Down Expand Up @@ -182,14 +186,16 @@ def _update_password(self, user: str, password: str):
self._storage_passwords.create(password, user, self._realm)
except binding.HTTPError as ex:
if ex.status == 409:
all_passwords = self._get_all_passwords_in_realm()
for pwd_stanza in all_passwords:
if self._realm is not None:
passwords = self.get_raw_passwords_in_realm()
else:
passwords = self.get_raw_passwords()
for pwd_stanza in passwords:
if pwd_stanza.realm == self._realm and pwd_stanza.username == user:
pwd_stanza.update(password=password)
return
raise ValueError(
"Can not get the password object for realm: %s user: %s"
% (self._realm, user)
f"Can not get the password object for realm: {self._realm} user: {user}"
)
else:
raise ex
Expand All @@ -211,25 +217,61 @@ def delete_password(self, user: str):
realm='realm_test')
>>> cm.delete_password('testuser1')
"""
all_passwords = self._get_all_passwords_in_realm()
if self._realm is not None:
passwords = self.get_raw_passwords_in_realm()
else:
passwords = self.get_raw_passwords()
deleted = False
ent_pattern = re.compile(
r"({}{}\d+)".format(user.replace("\\", "\\\\"), self.SEP)
)
for password in list(all_passwords):
for password in passwords:
match = (user == password.username) or ent_pattern.match(password.username)
if match and password.realm == self._realm:
password.delete()
deleted = True

if not deleted:
raise CredentialNotExistException(
"Failed to delete password of realm={}, user={}".format(
self._realm, user
)
f"Failed to delete password of realm={self._realm}, user={user}"
)

def _get_all_passwords_in_realm(self):
def get_raw_passwords(self) -> List[client.StoragePassword]:
"""Returns all passwords in the "raw" format."""
warnings.warn(
"Please pass realm to the CredentialManager, "
"so it can utilize get_raw_passwords_in_realm method instead."
)
return self._storage_passwords.list(count=-1)

def get_raw_passwords_in_realm(self) -> List[client.StoragePassword]:
"""Returns all passwords within the realm in the "raw" format."""
if self._realm is None:
raise ValueError("No realm was specified")
return self._storage_passwords.list(count=-1, search=f"realm={self._realm}")

def get_clear_passwords(self) -> List[Dict[str, str]]:
"""Returns all passwords in the "clear" format."""
warnings.warn(
"Please pass realm to the CredentialManager, "
"so it can utilize get_clear_passwords_in_realm method instead."
)
raw_passwords = self.get_raw_passwords()
return self._get_clear_passwords(raw_passwords)

def get_clear_passwords_in_realm(self) -> List[Dict[str, str]]:
"""Returns all passwords within the realm in the "clear" format."""
if self._realm is None:
raise ValueError("No realm was specified")
raw_passwords = self.get_raw_passwords_in_realm()
return self._get_clear_passwords(raw_passwords)

def _get_all_passwords_in_realm(self) -> List[client.StoragePassword]:
warnings.warn(
"_get_all_passwords_in_realm is deprecated, "
"please use get_raw_passwords_in_realm instead.",
stacklevel=2,
)
if self._realm:
all_passwords = self._storage_passwords.list(
count=-1, search=f"realm={self._realm}"
Expand All @@ -238,13 +280,12 @@ def _get_all_passwords_in_realm(self):
all_passwords = self._storage_passwords.list(count=-1, search="")
return all_passwords

@retry(exceptions=[binding.HTTPError])
def _get_all_passwords(self):
all_passwords = self._storage_passwords.list(count=-1)

def _get_clear_passwords(
self, passwords: List[client.StoragePassword]
) -> List[Dict[str, str]]:
results = {}
ptn = re.compile(rf"(.+){self.SEP}(\d+)")
for password in all_passwords:
for password in passwords:
match = ptn.match(password.name)
if match:
actual_name = match.group(1) + ":"
Expand All @@ -263,7 +304,7 @@ def _get_all_passwords(self):

# Backward compatibility
# To deal with the password with only one stanza which is generated by the old version.
for password in all_passwords:
for password in passwords:
match = ptn.match(password.name)
if (not match) and (password.name not in results):
results[password.name] = {
Expand All @@ -289,6 +330,16 @@ def _get_all_passwords(self):

return list(results.values())

@retry(exceptions=[binding.HTTPError])
def _get_all_passwords(self) -> List[Dict[str, str]]:
warnings.warn(
"_get_all_passwords is deprecated, "
"please use get_all_passwords_in_realm instead.",
stacklevel=2,
)
passwords = self._storage_passwords.list(count=-1)
return self._get_clear_passwords(passwords)


@retry(exceptions=[binding.HTTPError])
def get_session_key(
Expand Down Expand Up @@ -317,7 +368,7 @@ def get_session_key(
ValueError: if scheme, host or port are invalid.

Examples:
>>> credentials.get_session_key('user', 'password')
>>> get_session_key('user', 'password')
"""
validate_scheme_host_port(scheme, host, port)

Expand Down
118 changes: 100 additions & 18 deletions tests/integration/test_conf_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@
from solnlib import conf_manager


def test_conf_manager():
session_key = context.get_session_key()
cfm = conf_manager.ConfManager(
def _build_conf_manager(session_key: str) -> conf_manager.ConfManager:
return conf_manager.ConfManager(
session_key,
context.app,
owner=context.owner,
Expand All @@ -37,23 +36,106 @@ def test_conf_manager():
port=context.port,
)

try:
conf = cfm.get_conf("test")
except conf_manager.ConfManagerException:
conf = cfm.create_conf("test")

assert not conf.stanza_exist("test_stanza")
conf.update("test_stanza", {"k1": 1, "k2": 2}, ["k1"])
assert conf.get("test_stanza")["k1"] == 1
assert int(conf.get("test_stanza")["k2"]) == 2
assert conf.get("test_stanza")["eai:appName"] == "solnlib_demo"
assert len(conf.get_all()) == 1
conf.delete("test_stanza")

def test_conf_manager_when_no_conf_then_throw_exception():
session_key = context.get_session_key()
cfm = _build_conf_manager(session_key)

with pytest.raises(conf_manager.ConfManagerException):
cfm.get_conf("non_existent_configuration_file")


def test_conf_manager_when_conf_file_exists_but_no_specific_stanza_then_throw_exception():
session_key = context.get_session_key()
cfm = _build_conf_manager(session_key)

splunk_ta_addon_settings_conf_file = cfm.get_conf("splunk_ta_addon_settings")

with pytest.raises(conf_manager.ConfStanzaNotExistException):
conf.get("test_stanza")
splunk_ta_addon_settings_conf_file.get(
"non_existent_stanza_under_existing_conf_file"
)


@pytest.mark.parametrize(
"stanza_name,expected_result",
[
("logging", True),
("non_existent_stanza_under_existing_conf_file", False),
],
)
def test_conf_manager_stanza_exist(stanza_name, expected_result):
session_key = context.get_session_key()
cfm = _build_conf_manager(session_key)

splunk_ta_addon_settings_conf_file = cfm.get_conf("splunk_ta_addon_settings")

assert (
splunk_ta_addon_settings_conf_file.stanza_exist(stanza_name) == expected_result
)


def test_conf_manager_when_conf_file_exists():
session_key = context.get_session_key()
cfm = _build_conf_manager(session_key)

splunk_ta_addon_settings_conf_file = cfm.get_conf("splunk_ta_addon_settings")

expected_result = {
"disabled": "0",
"eai:access": {
"app": "solnlib_demo",
"can_change_perms": "1",
"can_list": "1",
"can_share_app": "1",
"can_share_global": "1",
"can_share_user": "0",
"can_write": "1",
"modifiable": "1",
"owner": "nobody",
"perms": {"read": ["*"], "write": ["admin"]},
"removable": "0",
"sharing": "global",
},
"eai:appName": "solnlib_demo",
"eai:userName": "nobody",
"log_level": "DEBUG",
}
assert splunk_ta_addon_settings_conf_file.get("logging") == expected_result


def test_conf_manager_delete_non_existent_stanza_then_throw_exception():
session_key = context.get_session_key()
cfm = _build_conf_manager(session_key)

splunk_ta_addon_settings_conf_file = cfm.get_conf("splunk_ta_addon_settings")

with pytest.raises(conf_manager.ConfStanzaNotExistException):
conf.delete("test_stanza")
conf.reload()
splunk_ta_addon_settings_conf_file.delete(
"non_existent_stanza_under_existing_conf_file"
)


def test_conf_manager_create_conf():
session_key = context.get_session_key()
cfm = _build_conf_manager(session_key)

conf_file = cfm.create_conf("conf_file_that_did_not_exist_before")
conf_file.update("stanza", {"key": "value"})

assert conf_file.get("stanza")["key"] == "value"


def test_conf_manager_update_conf_with_encrypted_keys():
session_key = context.get_session_key()
cfm = _build_conf_manager(session_key)

conf_file = cfm.create_conf("conf_file_with_encrypted_keys")
conf_file.update(
"stanza", {"key1": "value1", "key2": "value2"}, encrypt_keys=["key2"]
)

assert conf_file.get("stanza")["key2"] == "value2"


def test_get_log_level():
Expand Down
Loading