Skip to content

Commit

Permalink
Add a cli that makes sense to the script. Full usage is now reported …
Browse files Browse the repository at this point in the history
…before the error message.
  • Loading branch information
rowingdude committed Aug 13, 2024
1 parent 81ae29e commit 64994e9
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 43 deletions.
40 changes: 19 additions & 21 deletions analyzeMFT.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,35 @@
# 2-Aug-24
# - Updating to current PEP


import sys
import logging
from analyzemft import mft_session
from analyzemft.mft import set_default_options
from analyzemft.config import Config
from analyzemft.mft_session import MftSession

def setup_logging(log_level: str):
numeric_level = getattr(logging, log_level.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError(f'Invalid log level: {log_level}')
logging.basicConfig(level=numeric_level, format='%(asctime)s - %(levelname)s - %(message)s')

def main():
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

try:
# Get the command line options
parser = set_default_options()
options = parser.parse_args()

# Create and initialize the MftSession
session = mft_session.MftSession()
session.options = options
config = Config()
config.parse_args()
conf = config.get_config()

# Set up debug logging if requested
if options.debug:
logging.getLogger().setLevel(logging.DEBUG)
setup_logging(conf['log_level'])

# Run the MFT analysis
session = MftSession(conf)

try:
session.open_files()
session.process_mft_file()
session.print_records()
session.close_files()

except Exception as e:
logging.error(f"An error occurred: {e}")
sys.exit(1)
finally:
session.close_files()

if __name__ == "__main__":
main()
main()
15 changes: 8 additions & 7 deletions analyzemft/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
# Author: Benjamin Cance (bjc@tdx.li)
# Copyright Benjamin Cance 2024


from . import mft_utils
from . import mft
from . import mft_session
from . import mft_formatters

__all__ = ["mft_utils","mft_formatters", "mft", "mft_session"]
from .config import Config
from .mft_session import MftSession
from .mft_analyzer import MFTAnalyzer
from .mft_formatters import mft_to_csv, mft_to_body, mft_to_l2t, mft_to_json
from .mft_utils import WindowsTime, decodeMFTmagic, decodeMFTisactive, decodeMFTrecordtype, decodeVolumeInfo, decodeObjectID

__all__ = ['Config', 'MftSession', 'MFTAnalyzer', 'mft_to_csv', 'mft_to_body', 'mft_to_l2t', 'mft_to_json',
'WindowsTime', 'decodeMFTmagic', 'decodeMFTisactive', 'decodeMFTrecordtype', 'decodeVolumeInfo', 'decodeObjectID']
31 changes: 31 additions & 0 deletions analyzemft/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import logging
from pathlib import Path
from .config import Config
from .mft_session import MftSession

def setup_logging(log_level: str):
numeric_level = getattr(logging, log_level.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError(f'Invalid log level: {log_level}')
logging.basicConfig(level=numeric_level, format='%(asctime)s - %(levelname)s - %(message)s')

def main():
config = Config()
config.parse_args()
conf = config.get_config()

setup_logging(conf['log_level'])

session = MftSession(conf)

try:
session.open_files()
session.process_mft_file()
session.print_records()
except Exception as e:
logging.error(f"An error occurred: {e}")
finally:
session.close_files()

if __name__ == "__main__":
main()
66 changes: 66 additions & 0 deletions analyzemft/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import argparse
import json
from typing import Dict, Any
from pathlib import Path

DEFAULT_CONFIG = {
"debug": False,
"localtz": None,
"bodystd": False,
"bodyfull": False,
"json_output": False,
"output_dir": "output",
"csv_filename": "mft_output.csv",
"bodyfile_name": "mft_bodyfile",
"json_filename": "mft_output.json",
"log_level": "INFO"
}

class Config:
def __init__(self):
self.parser = self.create_argument_parser()
self.args = None
self.config: Dict[str, Any] = DEFAULT_CONFIG.copy()

def create_argument_parser(self) -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="MFT Analyzer")
parser.add_argument("input_file", help="Path to the MFT file to analyze")
parser.add_argument("--config", help="Path to a JSON configuration file")
parser.add_argument("--debug", action="store_true", help="Enable debug logging")
parser.add_argument("--localtz", help="Use local timezone")
parser.add_argument("--bodystd", action="store_true", help="Use standard body format")
parser.add_argument("--bodyfull", action="store_true", help="Use full body format")
parser.add_argument("--json", action="store_true", help="Output in JSON format")
parser.add_argument("--output-dir", help="Directory for output files")
parser.add_argument("--csv-filename", help="Name of the CSV output file")
parser.add_argument("--bodyfile-name", help="Name of the body file")
parser.add_argument("--json-filename", help="Name of the JSON output file")
parser.add_argument("--log-level", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
help="Set the logging level")
return parser

def load_config_file(self, config_file: str):
try:
with open(config_file, 'r') as f:
file_config = json.load(f)
self.config.update(file_config)
except json.JSONDecodeError:
print(f"Error: The config file {config_file} is not valid JSON.")
exit(1)
except FileNotFoundError:
print(f"Error: The config file {config_file} was not found.")
exit(1)

def parse_args(self):
self.args = self.parser.parse_args()

if self.args.config:
self.load_config_file(self.args.config)

# Override config with command-line arguments
for arg, value in vars(self.args).items():
if value is not None:
self.config[arg] = value

def get_config(self) -> Dict[str, Any]:
return self.config
7 changes: 4 additions & 3 deletions analyzemft/mft_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,19 @@
from .mft_utils import WindowsTime, decodeMFTmagic, decodeMFTisactive, decodeMFTrecordtype, decodeVolumeInfo, decodeObjectID

class MFTAnalyzer:
def __init__(self, options: Any):
self.options = options
def __init__(self, config: Dict[str, Any]):
self.config = config
self.mft: Dict[int, Dict[str, Any]] = {}
self.folders: Dict[str, str] = {}
self.num_records: int = 0
self.logger = logging.getLogger(__name__)
self.setup_logging()

def setup_logging(self):
level = logging.DEBUG if self.options.debug else logging.INFO
level = logging.DEBUG if self.config['debug'] else logging.INFO
logging.basicConfig(level=level, format='%(asctime)s - %(levelname)s - %(message)s')


def process_mft_file(self, file_mft: BinaryIO):
self.num_records = 0
while True:
Expand Down
20 changes: 8 additions & 12 deletions analyzemft/mft_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,23 @@
# Author: Benjamin Cance (bjc@tdx.li)
# Copyright Benjamin Cance 2024

VERSION='2.1.1'

import json
import logging
from pathlib import Path
from typing import TextIO
from typing import Dict, Any
from .mft_analyzer import MFTAnalyzer
from .mft_formatters import mft_to_csv, mft_to_body, mft_to_l2t, mft_to_json


class MftSession:
def __init__(self):
def __init__(self, config: Dict[str, Any]):
self.config = config
self.mft = {}
self.folders = {}
self.analyzer = None
self.options = None
self.file_mft: TextIO = None
self.file_mft = None
self.file_csv = None
self.file_body: TextIO = None
self.file_csv_time: TextIO = None
self.setup_logging()
self.file_body = None
self.file_json = None
self.analyzer = None


def setup_logging(self):
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
Expand Down

0 comments on commit 64994e9

Please sign in to comment.