From 55b439f4b2131aed62abe1c8140c1deb525cdeb0 Mon Sep 17 00:00:00 2001 From: Danny Grove Date: Mon, 3 Jun 2019 22:31:42 -0700 Subject: [PATCH 1/2] Add server and config subcommands --- cli.py | 114 ++++++++++++++++++++++++++++++++++++++++++++++++++------ mtls.py | 19 ++++++---- 2 files changed, 114 insertions(+), 19 deletions(-) diff --git a/cli.py b/cli.py index 3090f85..1e35cc0 100644 --- a/cli.py +++ b/cli.py @@ -1,13 +1,12 @@ import datetime import os import sys +from configparser import ConfigParser import click from mtls import MutualTLS -import os -import sys try: VERSION = open(os.path.join(sys._MEIPASS, 'VERSION')).read().strip() @@ -20,6 +19,11 @@ 'generation tool based on Googles Beyond Corp Zero Trust ' 'Authentication. Version {}'.format(VERSION)) +ALLOWED_KEYS = [ + 'name', 'email', 'host', 'fingerprint', 'country', 'state', 'locality', + 'common_name', 'organization_name', 'lifetime', 'url' +] + @click.group(help=HELP_TEXT) @click.version_option(VERSION, message="%(version)s") @@ -31,7 +35,8 @@ @click.option( '--config', '-c', type=click.Path(exists=True), - help='config file. [~/.config/mtls]' + default=os.path.join(os.environ.get('HOME'), '.config/mtls/config.ini'), + help='config file. [~/.config/mtls/config.ini]' ) @click.option( '--gpg-password', @@ -51,6 +56,11 @@ def cli( } if server is not None: ctx.obj = MutualTLS(server, options) + else: + ctx.obj = { + 'config_path': config, + 'server': server or 'DEFAULT' + } if sys.platform == 'win32' or sys.platform == 'cygwin': click.secho( 'Your platform is not currently supported', @@ -58,6 +68,84 @@ def cli( ) +@cli.command(help='Manage config') +@click.argument('key') +@click.argument('value') +@click.pass_context +def config(ctx, key, value): + # Deal with not actually instanting the MutualTLS class. + try: + server = ctx.obj.server or 'DEFAULT' + config_path = ctx.obj.config_file_path + except Exception as err: + server = ctx.obj['server'] + config_path = ctx.obj['config_path'] + + if key not in ALLOWED_KEYS: + click.secho( + 'Your key must be in the allowed keys, available options are: {}' + .format(",".join(ALLOWED_KEYS)), + fg='red' + ) + sys.exit(1) + if server == 'DEFAULT' and key == 'url': + click.secho( + 'url is not a valid config when no server is set', + fg='red' + ) + sys.exit(1) + config = ConfigParser() + config.read(config_path) + config.set(server, key, value) + with open(config_path, 'w') as config_file: + config.write(config_file) + + +@click.group(help='Manage Servers') +@click.pass_context +def server(ctx): + pass + + +@server.command('add', help="Add a server") +@click.argument('name') +@click.pass_context +def add_server(ctx, name): + if name is None or name == "": + click.secho('Server name cannot be empty', fg='red') + if " " in name: + click.secho('Server name cannot have space in it.', fg='red') + sys.exit(1) + config_path = ctx.obj['config_path'] + value = click.prompt( + 'What is the url of the Certificate Authority? (ie. ' + + 'https://certauth.example.com): ' + ) + config = ConfigParser() + config.read(config_path) + config.add_section(name) + config.set(name, 'url', value) + with open(config_path, 'w') as config_file: + config.write(config_file) + + +@server.command('remove', help="Remove a server") +@click.argument('name') +@click.pass_context +def remove_server(ctx, name): + if name is None or name == "": + click.secho('Server name cannot be empty', fg='red') + if " " in name: + click.secho('Server name cannot have space in it.', fg='red') + sys.exit(1) + config_path = ctx.obj['config_path'] + config = ConfigParser() + config.read(config_path) + config.remove_section(name) + with open(config_path, 'w') as config_file: + config.write(config_file) + + @click.group(help='Manage Certificates') @click.pass_context def certificate(ctx): @@ -102,7 +190,7 @@ def create_certificate( organization, ): options = {} - if ctx.obj is None: + if not isinstance(ctx.obj, MutualTLS): click.secho('A server was not provided.', fg='red') sys.exit(1) if friendly_name: @@ -139,7 +227,7 @@ def create_certificate( ) @click.pass_context def revoke_certificate(ctx, fingerprint, serial_number, name): - if ctx.obj is None: + if not isinstance(ctx.obj, MutualTLS): click.secho('A server was not provided.', fg='red') sys.exit(1) ctx.obj.revoke_cert(fingerprint, serial_number, name) @@ -156,7 +244,7 @@ def revoke_certificate(ctx, fingerprint, serial_number, name): ) @click.pass_context def get_crl(ctx, output): - if ctx.obj is None: + if not isinstance(ctx.obj, MutualTLS): click.secho('A server was not provided.', fg='red') sys.exit(1) ctx.obj.get_crl(output) @@ -195,7 +283,7 @@ def user(ctx): ) @click.pass_context def add_user(ctx, admin, fingerprint, email, keyserver): - if ctx.obj is None: + if not isinstance(ctx.obj, MutualTLS): click.secho('A server was not provided.', fg='red') sys.exit(1) if fingerprint is None and email is None: @@ -233,7 +321,7 @@ def add_user(ctx, admin, fingerprint, email, keyserver): ) @click.pass_context def remove_user(ctx, admin, fingerprint, email, keyserver): - if ctx.obj is None: + if not isinstance(ctx.obj, MutualTLS): click.secho('A server was not provided.', fg='red') sys.exit(1) if fingerprint is None and email is None: @@ -245,10 +333,6 @@ def remove_user(ctx, admin, fingerprint, email, keyserver): ctx.obj.remove_user(fingerprint, admin) -cli.add_command(certificate) -cli.add_command(user) - - def handle_email(ctx, email, keyserver=None): if keyserver: search_res = ctx.obj.gpg.search_keys(email, keyserver=keyserver) @@ -279,6 +363,12 @@ def handle_email(ctx, email, keyserver=None): return non_expired[value]['keyid'] +# Bind the subcommands to the cli +cli.add_command(certificate) +cli.add_command(user) +cli.add_command(server) + + if __name__ == '__main__': # main() cli() diff --git a/mtls.py b/mtls.py index 2588127..2fb9186 100644 --- a/mtls.py +++ b/mtls.py @@ -655,13 +655,18 @@ def get_config(self): config.read(self.config_file_path) return config - def update_config(self): - click.secho( - 'Updating config file settings for {server}'.format( - server=self.server - ), - fg='green' - ) + def update_config_value(self, key, value, namespace="DEFAULT"): + config.set(namespace, key, value) + self.update_config(show_msg=False) + + def update_config(self, show_msg=True): + if show_msg: + click.secho( + 'Updating config file settings for {server}'.format( + server=self.server + ), + fg='green' + ) with open(self.config_file_path, 'w') as config_file: self.config.write(config_file) From 9de58f554d66f30321b3158f6e2f87bfb61b64cf Mon Sep 17 00:00:00 2001 From: Danny Grove Date: Mon, 3 Jun 2019 23:28:38 -0700 Subject: [PATCH 2/2] Add test for server add/remove and config modfication --- test/test_cli.py | 80 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/test/test_cli.py b/test/test_cli.py index d0d71b8..b68977f 100644 --- a/test/test_cli.py +++ b/test/test_cli.py @@ -1074,3 +1074,83 @@ def get_crl_to_file(self): "-----END X509 CRL-----", crl.public_bytes(serialization.Encoding.PEM).decode('UTF-8') ) + + +class TestCliOptions(TestCliBase): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.env = { + 'GNUPGHOME': cls.USER_GNUPGHOME.name, + 'HOME': cls.HOME.name, + 'USER': 'test', + 'HOST': str(platform.uname()[1]) + } + cls.runner = CliRunner(env=cls.env) + cls.config = ConfigParser() + cls.config['DEFAULT'] = { + 'name': 'John Doe', + 'email': 'johndoe@example.com', + 'fingerprint': cls.user.pgp_key.fingerprint, + 'country': 'US', + 'state': 'CA', + 'locality': 'Mountain View', + 'organization_name': 'My Org' + } + cls.config['test'] = { + 'lifetime': 60, + 'url': 'http://localhost:4000', + } + cls.config_path = os.path.join( + cls.HOME.name, + 'config.ini' + ) + with open(cls.config_path, 'w') as configfile: + cls.config.write(configfile) + + def test_add_server(self): + server_url = "https://certauth.foo.bar" + result = self.runner.invoke( + cli, + [ + '-c', + self.config_path, + 'server', + 'add', + 'foo' + ], + input=server_url + '\n' + ) + self.assertEqual(result.exit_code, 0, msg=result.exc_info) + config = ConfigParser() + config.read(self.config_path) + self.assertEqual(config.get('foo', 'url'), server_url) + + def test_remove_server(self): + result = self.runner.invoke(cli, [ + '-c', + self.config_path, + 'server', + 'remove', + 'foo' + ]) + self.assertEqual(result.exit_code, 0, msg=result.exc_info) + config = ConfigParser() + config.read(self.config_path) + self.assertFalse(config.has_section('foo')) + + def test_set_user_config(self): + result = self.runner.invoke(cli, [ + '-c', + self.config_path, + 'config', + 'organization_name', + 'My New Org' + ]) + self.assertEqual(result.exit_code, 0, msg=result.exc_info) + config = ConfigParser() + config.read(self.config_path) + self.assertEqual( + config.get('DEFAULT', 'organization_name'), + 'My New Org' + )