Skip to content

Commit

Permalink
feat: introduce 4 new methods for the CredentialManager and deprecate…
Browse files Browse the repository at this point in the history
… some old ones

This commit introduces 4 new methods for the CredentialManager: get_raw_passwords, get_raw_passwords_in_realm, get_clear_passwords, get_clear_passwords_in_realm which now should be used to access passwords through CredentialManager.

Some old methods like _get_all_passwords_in_realm and _get_all_passwords were deprecated.
  • Loading branch information
artemrys committed Sep 1, 2022
1 parent 24a960c commit 9b21afc
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 380 deletions.
95 changes: 73 additions & 22 deletions solnlib/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

import json
import re
import warnings
from typing import Dict, List

from splunklib import binding
from splunklib import binding, client

from . import splunk_rest_client as rest_client
from .net_utils import validate_scheme_host_port
Expand Down Expand Up @@ -79,7 +81,7 @@ def __init__(
port: int = None,
**context: dict,
):
"""Initializes CredentialsManager.
"""Initializes CredentialManager.
Arguments:
session_key: Splunk access token.
Expand Down Expand Up @@ -123,9 +125,11 @@ def get_password(self, user: str) -> str:
realm='realm_test')
>>> cm.get_password('testuser2')
"""

all_passwords = self._get_all_passwords()
for password in all_passwords:
if self._realm is not None:
passwords = self.get_clear_passwords_in_realm()
else:
passwords = self.get_clear_passwords()
for password in passwords:
if password["username"] == user and password["realm"] == self._realm:
return password["clear_password"]

Expand Down Expand Up @@ -182,14 +186,16 @@ def _update_password(self, user: str, password: str):
self._storage_passwords.create(password, user, self._realm)
except binding.HTTPError as ex:
if ex.status == 409:
all_passwords = self._get_all_passwords_in_realm()
for pwd_stanza in all_passwords:
if self._realm is not None:
passwords = self.get_raw_passwords_in_realm()
else:
passwords = self.get_raw_passwords()
for pwd_stanza in passwords:
if pwd_stanza.realm == self._realm and pwd_stanza.username == user:
pwd_stanza.update(password=password)
return
raise ValueError(
"Can not get the password object for realm: %s user: %s"
% (self._realm, user)
f"Can not get the password object for realm: {self._realm} user: {user}"
)
else:
raise ex
Expand All @@ -211,25 +217,61 @@ def delete_password(self, user: str):
realm='realm_test')
>>> cm.delete_password('testuser1')
"""
all_passwords = self._get_all_passwords_in_realm()
if self._realm is not None:
passwords = self.get_raw_passwords_in_realm()
else:
passwords = self.get_raw_passwords()
deleted = False
ent_pattern = re.compile(
r"({}{}\d+)".format(user.replace("\\", "\\\\"), self.SEP)
)
for password in list(all_passwords):
for password in passwords:
match = (user == password.username) or ent_pattern.match(password.username)
if match and password.realm == self._realm:
password.delete()
deleted = True

if not deleted:
raise CredentialNotExistException(
"Failed to delete password of realm={}, user={}".format(
self._realm, user
)
f"Failed to delete password of realm={self._realm}, user={user}"
)

def _get_all_passwords_in_realm(self):
def get_raw_passwords(self) -> List[client.StoragePassword]:
"""Returns all passwords in the "raw" format."""
warnings.warn(
"Please pass realm to the CredentialManager, "
"so it can utilize get_raw_passwords_in_realm method instead."
)
return self._storage_passwords.list(count=-1)

def get_raw_passwords_in_realm(self) -> List[client.StoragePassword]:
"""Returns all passwords within the realm in the "raw" format."""
if self._realm is None:
raise ValueError("No realm was specified")
return self._storage_passwords.list(count=-1, search=f"realm={self._realm}")

def get_clear_passwords(self) -> List[Dict[str, str]]:
"""Returns all passwords in the "clear" format."""
warnings.warn(
"Please pass realm to the CredentialManager, "
"so it can utilize get_clear_passwords_in_realm method instead."
)
raw_passwords = self.get_raw_passwords()
return self._get_clear_passwords(raw_passwords)

def get_clear_passwords_in_realm(self) -> List[Dict[str, str]]:
"""Returns all passwords within the realm in the "clear" format."""
if self._realm is None:
raise ValueError("No realm was specified")
raw_passwords = self.get_raw_passwords_in_realm()
return self._get_clear_passwords(raw_passwords)

def _get_all_passwords_in_realm(self) -> List[client.StoragePassword]:
warnings.warn(
"_get_all_passwords_in_realm is deprecated, "
"please use get_raw_passwords_in_realm instead.",
stacklevel=2,
)
if self._realm:
all_passwords = self._storage_passwords.list(
count=-1, search=f"realm={self._realm}"
Expand All @@ -238,13 +280,12 @@ def _get_all_passwords_in_realm(self):
all_passwords = self._storage_passwords.list(count=-1, search="")
return all_passwords

@retry(exceptions=[binding.HTTPError])
def _get_all_passwords(self):
all_passwords = self._storage_passwords.list(count=-1)

def _get_clear_passwords(
self, passwords: List[client.StoragePassword]
) -> List[Dict[str, str]]:
results = {}
ptn = re.compile(rf"(.+){self.SEP}(\d+)")
for password in all_passwords:
for password in passwords:
match = ptn.match(password.name)
if match:
actual_name = match.group(1) + ":"
Expand All @@ -263,7 +304,7 @@ def _get_all_passwords(self):

# Backward compatibility
# To deal with the password with only one stanza which is generated by the old version.
for password in all_passwords:
for password in passwords:
match = ptn.match(password.name)
if (not match) and (password.name not in results):
results[password.name] = {
Expand All @@ -289,6 +330,16 @@ def _get_all_passwords(self):

return list(results.values())

@retry(exceptions=[binding.HTTPError])
def _get_all_passwords(self) -> List[Dict[str, str]]:
warnings.warn(
"_get_all_passwords is deprecated, "
"please use get_all_passwords_in_realm instead.",
stacklevel=2,
)
passwords = self._storage_passwords.list(count=-1)
return self._get_clear_passwords(passwords)


@retry(exceptions=[binding.HTTPError])
def get_session_key(
Expand Down Expand Up @@ -317,7 +368,7 @@ def get_session_key(
ValueError: if scheme, host or port are invalid.
Examples:
>>> credentials.get_session_key('user', 'password')
>>> get_session_key('user', 'password')
"""
validate_scheme_host_port(scheme, host, port)

Expand Down
118 changes: 100 additions & 18 deletions tests/integration/test_conf_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@
from solnlib import conf_manager


def test_conf_manager():
session_key = context.get_session_key()
cfm = conf_manager.ConfManager(
def _build_conf_manager(session_key: str) -> conf_manager.ConfManager:
return conf_manager.ConfManager(
session_key,
context.app,
owner=context.owner,
Expand All @@ -37,23 +36,106 @@ def test_conf_manager():
port=context.port,
)

try:
conf = cfm.get_conf("test")
except conf_manager.ConfManagerException:
conf = cfm.create_conf("test")

assert not conf.stanza_exist("test_stanza")
conf.update("test_stanza", {"k1": 1, "k2": 2}, ["k1"])
assert conf.get("test_stanza")["k1"] == 1
assert int(conf.get("test_stanza")["k2"]) == 2
assert conf.get("test_stanza")["eai:appName"] == "solnlib_demo"
assert len(conf.get_all()) == 1
conf.delete("test_stanza")

def test_conf_manager_when_no_conf_then_throw_exception():
session_key = context.get_session_key()
cfm = _build_conf_manager(session_key)

with pytest.raises(conf_manager.ConfManagerException):
cfm.get_conf("non_existent_configuration_file")


def test_conf_manager_when_conf_file_exists_but_no_specific_stanza_then_throw_exception():
session_key = context.get_session_key()
cfm = _build_conf_manager(session_key)

splunk_ta_addon_settings_conf_file = cfm.get_conf("splunk_ta_addon_settings")

with pytest.raises(conf_manager.ConfStanzaNotExistException):
conf.get("test_stanza")
splunk_ta_addon_settings_conf_file.get(
"non_existent_stanza_under_existing_conf_file"
)


@pytest.mark.parametrize(
"stanza_name,expected_result",
[
("logging", True),
("non_existent_stanza_under_existing_conf_file", False),
],
)
def test_conf_manager_stanza_exist(stanza_name, expected_result):
session_key = context.get_session_key()
cfm = _build_conf_manager(session_key)

splunk_ta_addon_settings_conf_file = cfm.get_conf("splunk_ta_addon_settings")

assert (
splunk_ta_addon_settings_conf_file.stanza_exist(stanza_name) == expected_result
)


def test_conf_manager_when_conf_file_exists():
session_key = context.get_session_key()
cfm = _build_conf_manager(session_key)

splunk_ta_addon_settings_conf_file = cfm.get_conf("splunk_ta_addon_settings")

expected_result = {
"disabled": "0",
"eai:access": {
"app": "solnlib_demo",
"can_change_perms": "1",
"can_list": "1",
"can_share_app": "1",
"can_share_global": "1",
"can_share_user": "0",
"can_write": "1",
"modifiable": "1",
"owner": "nobody",
"perms": {"read": ["*"], "write": ["admin"]},
"removable": "0",
"sharing": "global",
},
"eai:appName": "solnlib_demo",
"eai:userName": "nobody",
"log_level": "DEBUG",
}
assert splunk_ta_addon_settings_conf_file.get("logging") == expected_result


def test_conf_manager_delete_non_existent_stanza_then_throw_exception():
session_key = context.get_session_key()
cfm = _build_conf_manager(session_key)

splunk_ta_addon_settings_conf_file = cfm.get_conf("splunk_ta_addon_settings")

with pytest.raises(conf_manager.ConfStanzaNotExistException):
conf.delete("test_stanza")
conf.reload()
splunk_ta_addon_settings_conf_file.delete(
"non_existent_stanza_under_existing_conf_file"
)


def test_conf_manager_create_conf():
session_key = context.get_session_key()
cfm = _build_conf_manager(session_key)

conf_file = cfm.create_conf("conf_file_that_did_not_exist_before")
conf_file.update("stanza", {"key": "value"})

assert conf_file.get("stanza")["key"] == "value"


def test_conf_manager_update_conf_with_encrypted_keys():
session_key = context.get_session_key()
cfm = _build_conf_manager(session_key)

conf_file = cfm.create_conf("conf_file_with_encrypted_keys")
conf_file.update(
"stanza", {"key1": "value1", "key2": "value2"}, encrypt_keys=["key2"]
)

assert conf_file.get("stanza")["key2"] == "value2"


def test_get_log_level():
Expand Down
Loading

0 comments on commit 9b21afc

Please sign in to comment.