Skip to content

Commit

Permalink
Merge pull request #98 from Venafi/test-coverage
Browse files Browse the repository at this point in the history
Test coverage
  • Loading branch information
rvelaVenafi authored Feb 3, 2022
2 parents 182adb9 + 12099fd commit 4f0c8cd
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 31 deletions.
22 changes: 22 additions & 0 deletions .github/version_history.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[![Venafi](./images/Venafi_logo.png)](https://www.venafi.com/)

[![Apache 2.0 License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
![Community Supported](https://img.shields.io/badge/Support%20Level-Community-brightgreen)
![Compatible with TPP 17.3+ & VaaS](https://img.shields.io/badge/Compatibility-TPP%2017.3+%20%26%20VaaS-f9a90c)
[![pypi Downloads](https://img.shields.io/pypi/dw/vcert)](https://pypi.org/project/vcert/)
[![Quality Gate Status](https://sonarcloud.io/api/project_badges/measure?project=Venafi_vcert-python&metric=alert_status)](https://sonarcloud.io/summary/new_code?id=Venafi_vcert-python)

_**This open source project is community-supported.** To report a problem or share an idea, use
**[Issues](../../issues)**; and if you have a suggestion for fixing the issue, please include those details, too.
In addition, use **[Pull Requests](../../pulls)** to contribute actual bug fixes or proposed enhancements.
We welcome and appreciate all contributions. Got questions or want to discuss something with our team?
**[Join us on Slack](https://join.slack.com/t/venafi-integrations/shared_invite/zt-i8fwc379-kDJlmzU8OiIQOJFSwiA~dg)**!_

# Venafi Collection for Ansible
## Version History

#### 0.14.0
* **Dropped support for Python2. New baseline is Python 3.6+**
* Minor bug fixes on Policy Management
* Added integration with sonarcloud for code analysis
* Created version history file
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ NOTE: While developing with vcert-python, it is helpful if you are using a virtu
install the vcert-python library from source in development mode with `pip install --editable`.
See https://packaging.python.org/guides/installing-using-pip-and-virtual-environments/

## Version History

[Check version history here](.github/version_history.md)

## License

Copyright © Venafi, Inc. All rights reserved.
Expand Down
4 changes: 2 additions & 2 deletions tests/resources/policy_specification.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ policy:
maxValidDays: 120
subject:
orgs:
- venafi_yaml
- venafi.com
orgUnits:
- DevOps_yaml
- DevOps
localities:
- Merida
states:
Expand Down
65 changes: 42 additions & 23 deletions tests/test_pm.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@
PolicySpecification)
from vcert.policy.pm_cloud import CA_TYPE_DIGICERT, CA_TYPE_ENTRUST

POLICY_SPEC_JSON = 'resources/policy_specification.json'
POLICY_SPEC_YAML = 'resources/policy_specification.yaml'
# This values are loaded from the project root which is vcert-python, not tests folder
POLICY_SPEC_JSON = './tests/resources/policy_specification.json'
POLICY_SPEC_YAML = './tests/resources/policy_specification.yaml'
CA_TYPE_TPP = 'TPP'

log = logger.get_child("test-pm")
Expand All @@ -38,14 +39,12 @@
class TestParsers(unittest.TestCase):
def __init__(self, *args, **kwargs):
super(TestParsers, self).__init__(*args, **kwargs)
self.json_file = _resolve_resources_path(POLICY_SPEC_JSON)
self.yaml_file = _resolve_resources_path(POLICY_SPEC_YAML)
self.json_file = POLICY_SPEC_JSON
self.yaml_file = POLICY_SPEC_YAML

def test_json_parsing(self):
# data = json_parser.parse_file(self.json_file)
# print_data = parse_policy_spec(data)
# pprint(print_data)
pass
ps = json_parser.parse_file(self.json_file)
self._assert_policy_spec(ps)

def test_json_serialization(self):
ps = PolicySpecification(policy=_get_policy_obj(), defaults=_get_defaults_obj())
Expand All @@ -55,23 +54,41 @@ def test_yaml_11_parsing(self):
pass

def test_yaml_12_parsing(self):
# data = yaml_parser.parse_file(self.yaml_file)
# print_data = parse_policy_spec(data)
# pprint(print_data)
pass
ps = yaml_parser.parse_file(self.yaml_file)
self._assert_policy_spec(ps)

def test_yaml_serialization(self):
ps = PolicySpecification(policy=_get_policy_obj(), defaults=_get_defaults_obj())
yaml_parser.serialize(ps, 'test_yaml_serialization.yaml')

def _assert_policy_spec(self, ps):
"""
:param vcert.policy.PolicySpecification ps:
:return:
"""
self.assertIsNotNone(ps)
self.assertIn("venafi.com", ps.policy.domains)
self.assertIn("kwan.com", ps.policy.domains)
self.assertIn("venafi.com", ps.policy.subject.orgs)
self.assertTrue(len(ps.policy.subject.orgs) == 1)
self.assertIn("DevOps", ps.policy.subject.org_units)
self.assertTrue(len(ps.policy.subject.org_units) == 1)
self.assertIn("Merida", ps.policy.subject.localities)
self.assertTrue(len(ps.policy.subject.localities) == 1)
self.assertIn("RSA", ps.policy.key_pair.key_types)
self.assertTrue(len(ps.policy.key_pair.key_types) == 1)
self.assertIn(2048, ps.policy.key_pair.rsa_key_sizes)
self.assertTrue(len(ps.policy.key_pair.rsa_key_sizes) == 1)


class TestTPPPolicyManagement(unittest.TestCase):
def __init__(self, *args, **kwargs):
self.tpp_conn = TPPTokenConnection(url=TPP_TOKEN_URL, http_request_kwargs={'verify': "/tmp/chain.pem"})
auth = Authentication(user=TPP_USER, password=TPP_PASSWORD, scope=SCOPE_PM)
self.tpp_conn.get_access_token(auth)
self.json_file = _resolve_resources_path(POLICY_SPEC_JSON)
self.yaml_file = _resolve_resources_path(POLICY_SPEC_YAML)
self.json_file = POLICY_SPEC_JSON
self.yaml_file = POLICY_SPEC_YAML
super(TestTPPPolicyManagement, self).__init__(*args, **kwargs)

def test_create_policy_from_json(self):
Expand Down Expand Up @@ -108,8 +125,8 @@ def _create_policy_tpp(self, policy_spec=None, policy=None, defaults=None):
class TestCloudPolicyManagement(unittest.TestCase):
def __init__(self, *args, **kwargs):
self.cloud_conn = CloudConnection(token=CLOUD_APIKEY, url=CLOUD_URL)
self.json_file = _resolve_resources_path(POLICY_SPEC_JSON)
self.yaml_file = _resolve_resources_path(POLICY_SPEC_YAML)
self.json_file = POLICY_SPEC_JSON
self.yaml_file = POLICY_SPEC_YAML
super(TestCloudPolicyManagement, self).__init__(*args, **kwargs)

def test_create_policy_from_json(self):
Expand Down Expand Up @@ -246,10 +263,12 @@ def _get_tpp_policy_name():
time = timestamp()
return f"{_get_app_name().format(time)}"


def _resolve_resources_path(path):
resources_dir = os.path.dirname(__file__)
log.debug(f"Testing root folder: [{resources_dir}]")
resolved_path = f"./{path}" if resources_dir.endswith('tests') else f"./tests/{path}"
log.debug(f"resolved path: [{resolved_path}]")
return resolved_path
# def _resolve_resources_path(path):
# resources_dir = os.path.dirname(__file__)
# log.debug(f"Testing root folder: [{resources_dir}]")
# if resources_dir.endswith('tests'):
# resolved_path = f"./{path}"
# else:
# resolved_path = f"./tests/{path}"
# log.debug(f"resolved path: [{resolved_path}]")
# return resolved_path
35 changes: 29 additions & 6 deletions tests/test_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
import unittest

from assets import SSH_CERT_DATA, SSH_PRIVATE_KEY, SSH_PUBLIC_KEY
from test_env import TPP_TOKEN_URL, TPP_USER, TPP_PASSWORD, TPP_SSH_CADN
from test_env import TPP_TOKEN_URL, TPP_USER, TPP_PASSWORD, TPP_SSH_CADN, TPP_URL
from test_utils import timestamp
from vcert import (CommonConnection, SSHCertRequest, TPPTokenConnection, Authentication,
SCOPE_SSH, write_ssh_files, logger, venafi_connection, VenafiPlatform)
SCOPE_SSH, write_ssh_files, logger, venafi_connection, VenafiPlatform, TPPConnection)
from vcert.ssh_utils import SSHRetrieveResponse, SSHKeyPair, SSHCATemplateRequest

log = logger.get_child("test-ssh")
Expand All @@ -31,12 +31,12 @@
SSH_CERT_DATA_ERROR = "Certificate data is empty for Certificate {}" # type: str


class TestTPPSSHCertificate(unittest.TestCase):
class TestTPPTokenSSHCertificate(unittest.TestCase):
def __init__(self, *args, **kwargs):
self.tpp_conn = TPPTokenConnection(url=TPP_TOKEN_URL, http_request_kwargs={'verify': "/tmp/chain.pem"})
auth = Authentication(user=TPP_USER, password=TPP_PASSWORD, scope=SCOPE_SSH)
self.tpp_conn.get_access_token(auth)
super(TestTPPSSHCertificate, self).__init__(*args, **kwargs)
super(TestTPPTokenSSHCertificate, self).__init__(*args, **kwargs)

def test_enroll_local_generated_keypair(self):
keypair = SSHKeyPair()
Expand Down Expand Up @@ -75,8 +75,20 @@ def test_retrieve_ca_public_key(self):
log.debug(f"{TPP_SSH_CADN} Public Key data:\n{ssh_config.ca_public_key}")

def test_retrieve_ca_public_key_and_principals(self):
request = SSHCATemplateRequest(ca_template=TPP_SSH_CADN)
ssh_config = self.tpp_conn.retrieve_ssh_config(ca_request=request)
ssh_config = _retrieve_ssh_config(self.tpp_conn)
self.assertIsNotNone(ssh_config.ca_public_key, f"{TPP_SSH_CADN} Public Key data is empty")
self.assertIsNotNone(ssh_config.ca_principals, f"{TPP_SSH_CADN} default principals is empty")
log.debug(f"{TPP_SSH_CADN} Public Key data: {ssh_config.ca_public_key}")
log.debug(f"{TPP_SSH_CADN} default principals: {ssh_config.ca_principals}")


class TestTPPSSHCertificate(unittest.TestCase):
def __init__(self, *args, **kwargs):
self.tpp_conn = TPPConnection(TPP_USER, TPP_PASSWORD, TPP_URL, http_request_kwargs={'verify': "/tmp/chain.pem"})
super(TestTPPSSHCertificate, self).__init__(*args, **kwargs)

def test_retrieve_ca_public_key_and_principals(self):
ssh_config = _retrieve_ssh_config(self.tpp_conn)
self.assertIsNotNone(ssh_config.ca_public_key, f"{TPP_SSH_CADN} Public Key data is empty")
self.assertIsNotNone(ssh_config.ca_principals, f"{TPP_SSH_CADN} default principals is empty")
log.debug(f"{TPP_SSH_CADN} Public Key data: {ssh_config.ca_public_key}")
Expand Down Expand Up @@ -122,5 +134,16 @@ def _enroll_ssh_cert(connector, request):
return response


def _retrieve_ssh_config(connection):
"""
:param vcert.AbstractTPPConnection connection:
:rtype: vcert.SSHConfig
"""
request = SSHCATemplateRequest(ca_template=TPP_SSH_CADN)
ssh_config = connection.retrieve_ssh_config(ca_request=request)
return ssh_config


def _random_key_id():
return f"vcert-python-ssh-{timestamp()}"

0 comments on commit 4f0c8cd

Please sign in to comment.