Skip to content

Creating a Module

Daniel Lawson edited this page Feb 1, 2019 · 13 revisions

Overview

Armory's usefulness comes from being able to quickly code up a module for a tool and ingest the output for useful data. This data can then be inserted into the database, and used with other tools or reports. Throughout this page, we use the following terminology:

  • Module: This is a Python file that lives either in armory/includes/modules or in a custom module folder. These are made available via armory. They are used to do something, such as run a tool.
  • Report: This is a Python file that lives in modules/includes/reports or in a custom report folder. These are also made available via armory. They are used to present data in some format back to the user. They can query the database or even process tool output directly.
  • Template: Templates are basic skeleton classes sitting in armory/included/. All of the modules and reports inherit a template for their functionality. The templates usually contain generic code to do common tasks. Anything in the template can be overridden in the module/report itself.
  • Tool: This is some third party application that isn't a part of Armory, but that a module can run and process that output of.

Right now, modules are based off of templates in armory/included/ModuleTemplate.py. The two templates are as follows:

  • ModuleTemplate: Basic template with almost nothing built in. This is what you would base any "out-of-the-box" modules.
  • ToolTemplate: Template designed to allow one to easily create a multithreaded module for an arbitrary tool, and process the output.

How a Module Works

The following is the basic SampleModule. It contains the bare minimum to have a module do something.

#!/usr/bin/python

from armory.included.ModuleTemplate import ModuleTemplate

class Module(ModuleTemplate):
    
    name = "SampleModule"

    def set_options(self):
        super(Module, self).set_options()

        self.options.add_argument('-p', '--print_message', help="Message to print")

    def run(self, args):
        print("Running!")
        if args.print_message:
            print("Printing message")
            print(args.print_message)

The imports at the top import the base template. The set_options function defines additional command line options. The magic happens with the run function. This function is executed when a user runs the module from Armory. To make things a little bit easier, we have a ToolTemplate that instead of run, gives you three functions for generating targets, building a command, and processing the output.

Custom Module Using ToolTemplate

Workflow

The run function of the ToolTemplate looks like the following:

def run(self, args):
        if args.tool_args:
            args.tool_args = ' '.join(args.tool_args)
        if not args.binary:
            self.binary = which.run(self.binary_name)
        else:
            self.binary = which.run(args.binary)

        if not self.binary:
            print("%s binary not found. Please explicitly provide path with --binary" % self.name)

        else:
            if args.timeout:
                timeout = int(args.timeout)
            else:
                timeout = None
            targets = self.get_targets(args)
            
            if not args.no_binary:
                cmd = self.build_cmd(args)
                cmds = [shlex.split(cmd.format(**t)) + [timeout] for t in targets]
                pool = ThreadPool(int(args.threads))

                pool.map(run_cmd, cmds)
            self.process_output(targets)

This verifies the binary exists, sets the timeout used for Popen, then calls self.get_targets to generate a list of dictionaries that contain placeholder data that will be inserted into the command. Next it checks if the no_binary flag was passed, and if not, builds the command with self.build_cmd which contains format placeholders. It then builds a full list of command targets with the command split into a list, as well as the timeout appended on the end. Finally, the whole mess is sent to pool.map for multithreading goodness!

A note on the timeout The timeout doesn't actually kill the process if it runs too long, it just moves on. This is good for things like directory brute forcing, so sites that respond... very... slowly... don't bog down everything. UPDATE Seems Python3 actually kills the process when it times out.

As an example, we'll build a module for tko-subs, a tool used to check various domains to see if they are vulnerable to hijacking. (The tool can be found here).

Create the skeleton

For the first step, we'll copy the skeleton for a tool from the SampleToolModule.py module into a new Tko-subs.py file.

cd armory/included/modules
cp SampleToolModule.py Tko-subs.py

At this point, the file Tko-subs.py contains the following code:

#!/usr/bin/python

from armory.included.ModuleTemplate import ToolTemplate

class Module(ToolTemplate):
    '''
    This is a sample skeleton for building a module to run a tool.
    '''    
    name = "SampleToolModule"
    binary_name = "sample-tool"

    def set_options(self):
        super(Module, self).set_options()

        self.options.add_argument('-p', '--print_message', help="Message to print")

        # Change the default timeout
        self.options.set_defaults(timeout="15")
    
    def get_targets(self, args):
        '''
        This module is used to build out a target list containing dictionaries that will get plugged into
        the commands that are executed.
        '''

        return []

    def build_cmd(self, args):
        '''
        Create the actual command that will be executed. Use {target}, {output}, etc as placeholders. These
        should match up to the dictionaries returned by get_targets()
        '''
        
        return ''

    def process_output(self, cmds):
        '''
        Process the output generated by the earlier commands.
        '''

Initial Customization

As the very first thing, we'll update the name field to reflect the module, as well as update the binary_name. This name will be searched for in the path if the --binary flag is not provided. We'll also update the help text.

class Module(ToolTemplate):
    '''
    This tool will check various subdomains for domain takeover vulnerabilities.
    '''    
    name = "Tko-subs"
    binary_name = "tko-subs"

Custom Parameters

The first function set_options is used to add any additional options that will be used with the tool. These are being configured via ArgParse's add_argument function, and will also show up in the system help. The template has the following options predefined:

        self.options.add_argument('-b', '--binary', help="Path to the binary")
        self.options.add_argument('-o', '--output_path', help="Relative path (to the base directory) to store output", default=self.name)
        self.options.add_argument('--threads', help="Number of Armory threads to use", default="10")
        self.options.add_argument('--timeout', help="Thread timeout in seconds, default is 300.", default="300")
        self.options.add_argument('--tool_args', help="Additional arguments to be passed to the tool", nargs=argparse.REMAINDER)
        self.options.add_argument('--no_binary', help="Runs through without actually running the binary. Useful for if you already ran the tool and just want to process the output.", action="store_true")

This tool has the following options available:

Usage of ./tko-subs:
  -data string
    	CSV file containing CMS providers' string for identification (default "providers-data.csv")
  -domain string
    	Domains separated by ,
  -domains string
    	List of domains to check (default "domains.txt")
  -githubtoken string
    	Github personal access token
  -herokuapikey string
    	Heroku API key
  -herokuappname string
    	Heroku app name
  -herokuusername string
    	Heroku username
  -output string
    	Output file to save the results (default "output.csv")
  -takeover
    	Flag to denote if a vulnerable domain needs to be taken over or not
  -threads int
    	Number of threads to run parallel (default 5)

For our module, we want to take direct control over the data, domain, domains and output arguments. The data argument will point to the "providers-data.csv" file needed by the tool. The domain and domains arguments will be used to provide the targets to the tool, and the output argument specifies where to save the output. We'll modify the set_options function to include these.

    def set_options(self):
        super(Module, self).set_options()

        self.options.add_argument('--data', help="Path to the providers_data.csv file")
        self.options.add_argument('-d', '--domain', help="Domain to run the tool against.")
        self.options.add_argument('-i', '--importdb', help="Import subdomains from the database.", action="store_true")
        self.options.add_argument('--rescan', help="Rescan already processed entries", action="store_true")

Defining Targets

The next function we need to modify is the get_targets function. This gets the parsed arguments passed to it, and returns a list of tuples containing the targets and output files. We will build a target list based on the domain names either passed with -d, or import them directly from the database.

    def get_targets(self, args):
        '''
        This module is used to build out a target list and output file list, depending on the arguments. Should return a
        list in the format [{'target':target, 'output':output}, {'target':target, 'output':output}, etc, etc]
        '''

        # Create an empty list to add targets to
        domains = []
        
        # Check if a domain has been explicitly passed, and if so add it to targets
        if args.domain:
            domains.append(args.domain)

        # Check if the --import option was passed  and if so get data from the domain.
        # The scoping is set to "active" since the tool could potentially make a
        # request from the server.

        if args.importdb:
            if args.rescan:
                all_domains = self.Domain.all(scope_type="active")
            else:
                all_domains = self.Domain.all(scope_type="active", tool=self.name)
            for d in all_domains:
                domains.append(d.domain)


        # Set the output_path base, as a junction of the base_path and the path name supplied
        if args.output_path[0] == "/":
            output_path = os.path.join(self.base_config['PROJECT']['base_path'], args.output_path[1:] )
        else:
            output_path = os.path.join(self.base_config['PROJECT']['base_path'], args.output_path)

        # Create the path if it doesn't already exist
        if not os.path.exists(output_path):
            os.makedirs(output_path)

        res = []

        # Create the final targets list, with output paths added.
        for d in domains:
            res.append({'target':d, 'output':os.path.join(output_path, d)})

        return res

We've also added another import for the DomainRepository class, as well as an __init__ function to set up the database.

from armory.database.repositories import DomainRepository
import os
    def __init__(self, db):
        self.db = db
        self.Domain = DomainRepository(db, self.name)

Building the Tool Command

Next we set up the build_cmd function. This takes the parsed arguments and builds out the actual command template that will be used, with format placeholders for the target and output filenames. This returns a string containing both {target} and {output}, since these were used in the dictionaries from get_targets.

    def build_cmd(self, args):
        '''
        Create the actual command that will be executed. Use {target} and {output} as placeholders.
        '''
        cmd = self.binary + " -domain {target}  -output {output} "
        
        # Add in any extra arguments passed in the extra_args parameter
        if args.tool_args:
            cmd += args.tool_args
        # Add that data parameter in there

        cmd += " -data " + args.data
        return cmd

Processing Output

Now we have a module that will run the tool, and generate output. The final step is to go get that output and ingest it back into the database. This won't always be necessary, depending on the tool. We'll modify the process_output function to parse all the output files. It is passed the target list generated earlier.

    def process_output(self, cmds):
        '''
        Process the output generated by the earlier commands.
        '''

        # Cycle through all of the targets we ran earlier
        for c in cmds:
            
            output_file = c['output']
            target = c['target']

            # Read file
            data = open(output_file).read().split('\n')

            # Quick and dirty way to filter out headers and blank lines, as well
            # as duplicates
            res = list(set([d for d in data if 'Domain,Cname,Provider' not in d and d]))
            if res:
                # Load up the DB entry.
                created, subdomain = self.Domain.find_or_create(domain=target)

                # Process results
                for d in res:
                    results = d.split(',')

                    if results[3] == "false":
                        display_warning("Hosting found at {} for {}, not vulnerable.".format(target, results[2]))

                    elif results[3] == "true":
                        display_new("{} vulnerable to {}!".format(target, results[2]))
                        if not subdomain.meta[self.name].get('vulnerable', False):
                            subdomain.meta[self.name]['vulnerable'] = []
                        subdomain.meta[self.name]['vulnerable'].append(d)
                        
                    else:
                        display_warning("Not sure of result: {}".format(data))
                
                # This is a little hackish, but needed to get data to save
                t = dict(subdomain.meta)
                self.Domain.commit()
                subdomain.meta = t

                        
        self.Domain.commit()

Final Module

Finally, our module should look like this:

#!/usr/bin/python

from armory.included.ModuleTemplate import ToolTemplate
from armory.database.repositories import DomainRepository
from armory.included.utilities.color_display import display_new, display_error, display_warning, display
import os
import pdb

class Module(ToolTemplate):
    '''
    This tool will check various subdomains for domain takeover vulnerabilities.
    '''    
    name = "Tko-subs"
    binary_name = "tko-subs"

    def __init__(self, db):
        self.db = db
        self.Domain = DomainRepository(db, self.name)
    

    def set_options(self):
        super(Module, self).set_options()

        self.options.add_argument('--data', help="Path to the providers_data.csv file")
        self.options.add_argument('-d', '--domain', help="Domain to run the tool against.")
        self.options.add_argument('-i', '--importdb', help="Import subdomains from the database.", action="store_true")
        self.options.add_argument('--rescan', help="Rescan already processed entries", action="store_true")
    
    def get_targets(self, args):
        '''
        This module is used to build out a target list and output file list, depending on the arguments. Should return a
        list in the format [{'target':target, 'output':output}, {'target':target, 'output':output}, etc, etc]
        '''

        # Create an empty list to add targets to
        domains = []
        
        # Check if a domain has been explicitly passed, and if so add it to targets
        if args.domain:
            domains.append(args.domain)

        # Check if the --import option was passed  and if so get data from the domain.
        # The scoping is set to "active" since the tool could potentially make a
        # request from the server.

        if args.importdb:
            if args.rescan:
                all_domains = self.Domain.all(scope_type="active")
            else:
                all_domains = self.Domain.all(scope_type="active", tool=self.name)
            for d in all_domains:
                domains.append(d.domain)


        # Set the output_path base, as a junction of the base_path and the path name supplied
        if args.output_path[0] == "/":
            output_path = os.path.join(self.base_config['PROJECT']['base_path'], args.output_path[1:] )
        else:
            output_path = os.path.join(self.base_config['PROJECT']['base_path'], args.output_path)

        # Create the path if it doesn't already exist
        if not os.path.exists(output_path):
            os.makedirs(output_path)

        res = []

        # Create the final targets list, with output paths added.
        for d in domains:
            res.append({'target':d, 'output':os.path.join(output_path, d)})

        return res

    def build_cmd(self, args):
        '''
        Create the actual command that will be executed. Use {target} and {output} as placeholders.
        '''
        cmd = self.binary + " -domain {target}  -output {output} "
        
        # Add in any extra arguments passed in the extra_args parameter
        if args.tool_args:
            cmd += args.tool_args
        # Add that data parameter in there

        cmd += " -data " + args.data
        return cmd
        

    def process_output(self, cmds):
        '''
        Process the output generated by the earlier commands.
        '''

        # Cycle through all of the targets we ran earlier
        for c in cmds:
            
            output_file = c['output']
            target = c['target']

            # Read file
            data = open(output_file).read().split('\n')

            # Quick and dirty way to filter out headers and blank lines, as well
            # as duplicates
            res = list(set([d for d in data if 'Domain,Cname,Provider' not in d and d]))
            if res:
                # Load up the DB entry.
                created, subdomain = self.Domain.find_or_create(domain=target)

                # Process results
                for d in res:
                    results = d.split(',')

                    if results[3] == "false":
                        display_warning("Hosting found at {} for {}, not vulnerable.".format(target, results[2]))

                    elif results[3] == "true":
                        display_new("{} vulnerable to {}!".format(target, results[2]))
                        if not subdomain.meta[self.name].get('vulnerable', False):
                            subdomain.meta[self.name]['vulnerable'] = []
                        subdomain.meta[self.name]['vulnerable'].append(d)
                        
                    else:
                        display_warning("Not sure of result: {}".format(data))
                
                # This is a little hackish, but needed to get data to save
                t = dict(subdomain.meta)
                self.Domain.commit()
                subdomain.meta = t

                        
        self.Domain.commit()