Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ntfs path hint #2702

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 267 additions & 0 deletions plaso/parsers/ntfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from plaso.parsers import manager


# pylint: disable=too-many-instance-attributes,too-few-public-methods
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which version of pylint did you use?

class NTFSFileStatEventData(events.EventData):
"""NTFS file system stat event data.

Expand All @@ -35,6 +36,8 @@ class NTFSFileStatEventData(events.EventData):
name (str): name associated with the stat event, for example that of
a $FILE_NAME attribute or None if not available.
parent_file_reference (int): NTFS file reference of the parent.
path_hint (str): A path to the NTFS file constructed from the
`parent_file_reference`
"""

DATA_TYPE = 'fs:stat:ntfs'
Expand All @@ -49,6 +52,7 @@ def __init__(self):
self.is_allocated = None
self.name = None
self.parent_file_reference = None
self.path_hint = None


class NTFSUSNChangeEventData(events.EventData):
Expand Down Expand Up @@ -90,6 +94,14 @@ class NTFSMFTParser(interface.FileObjectParser):
_MFT_ATTRIBUTE_STANDARD_INFORMATION = 0x00000010
_MFT_ATTRIBUTE_FILE_NAME = 0x00000030
_MFT_ATTRIBUTE_OBJECT_ID = 0x00000040
_PATH_SEPARATOR = '/'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use the forward slash?

_PATH_NO_NAME_REPLACEMENT = '???'
_PATH_NAME_ORPHAN = '$Orphan'

def __init__(self):
"""Intializes the NTFS MFT Parser"""
super(NTFSMFTParser, self).__init__()
self.path_info = dict()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not self.path_info = {} ?


@classmethod
def GetFormatSpecification(cls):
Expand Down Expand Up @@ -167,6 +179,19 @@ def _ParseMFTAttribute(self, parser_mediator, mft_entry, mft_attribute):
event_data.name = name
event_data.parent_file_reference = parent_file_reference

if mft_attribute.attribute_type == self._MFT_ATTRIBUTE_FILE_NAME:
parent_record_number = parent_file_reference & 0xffffffffffff
parent_sequence_number = parent_file_reference >> 48
event_data.path_hint = self._GetPathForFile(
parser_mediator, name, parent_record_number, parent_sequence_number)
else:
# Even though $SI attributes do not carry a name, we are
# opportunistic and use the most descriptive name available
(name, parent_record_number,
parent_sequence_number) = self._GetNameAndParentFromEntry(mft_entry)
event_data.path_hint = self._GetPathForFile(
parser_mediator, name, parent_record_number, parent_sequence_number)

try:
creation_time = mft_attribute.get_creation_time_as_integer()
except OverflowError as exception:
Expand Down Expand Up @@ -275,6 +300,238 @@ def _ParseMFTEntry(self, parser_mediator, mft_entry):
'unable to parse MFT attribute: {0:d} with error: {1!s}').format(
attribute_index, exception))

def _GetFNAttributeInfos(self, mft_entry):
"""Returns a list of tuples containing information required to derive
the most descriptive name for a record.

Args:
mft_entry (pyfsntfs.file_entry): MFT entry

Returns:
list[tuple]: List of tuples with (name, attribute_index,
parent_record_number, parent_sequence_number)

"""
attribute_info = []
for attribute_index in range(0, mft_entry.number_of_attributes):
mft_attribute = mft_entry.get_attribute(attribute_index)
if mft_attribute.attribute_type == self._MFT_ATTRIBUTE_FILE_NAME:
parent_file_reference = mft_attribute.parent_file_reference
parent_record_number = parent_file_reference & 0xFFFFFFFFFFFF
parent_sequence_number = parent_file_reference >> 48
attribute_info.append((getattr(mft_attribute, 'name', ''),
attribute_index,
parent_record_number,
parent_sequence_number))
return attribute_info

def _GetNameAndParentFromAttributeInfos(self, attribute_infos):
"""Returns the most descriptive name, parent entry record number and
sequence number from the return value of `GetFNAttributeInfos`.

Each $MFT entry can have multiple $FILE_NAME attributes containing
different names. One prominent example is when a file/folder name
exceeds 8 characters, the $MFT will then contain two entries, one
"normal" entry with the "full" name, and a DOS-compatible one
(8.3). Each $FN attribute has a namespace value that denotes the
name's type:

0x0: POSIX (Case sensitive; all unicode except '/' and NULL)
0x1: Win32 (Case insensitive; all unicode except '/', '\', ':',
'>', '<', '?')
0x2: DOS (Case insensitive; all upper case and no special characters.
Must be 8 or fewer for name, 3 or less for the extension)
0x3: Win32 & DOS (When the name is Win32 but does already fit in
the DOS namespace)

Rule of precedence for this function is: "0x3 > 0x1 > 0x0 > 0x2".
On same value entries, the lower attribute index wins

TODO: `namespace` (byte 65 of the $FILE_NAME attribute) is not
available in `pyfsntfs.file_name_attribute`. For now we "guess"
and go with the "longest string is best string, with lower
attribute index preference"

Args:
attribute_infos (list[tuple]): A list of tuples produced by
`_GetFNAttributeInfos`

Returns:
tuple: A tuple of (name, parent_record_number, parent_sequence_number)

"""

name = None
parent_record_number = None
parent_sequence_number = None

# Sort the attributes by re-mapping the `namespace` values
Copy link
Member

@joachimmetz joachimmetz Nov 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code is not used, please remove

# ns_map = {
# 0x0: 0x2,
# 0x1: 0x1,
# 0x2: 0x3,
# 0x3: 0x0
# }
# file_name_attributes.sort(key=lambda a: (ns_map[a[4]], a[1]))

# Sort by name length and attribute index
attribute_infos.sort(key=lambda a: (len(a[0]), a[1]), reverse=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

per style guide lambda a => lambda attribute


# Go through the sorted attribute infos, first one that fulfills
# our criteria wins (criteria being name must not be empty)
for (attribute_name, _, attribute_parent_num,
attribute_parent_seq) in attribute_infos:
if attribute_name:
name = attribute_name
parent_record_number = attribute_parent_num
parent_sequence_number = attribute_parent_seq
break

# If we have not found suitable entry and there are entries, we take
# the first one as last resort
if attribute_infos and name is None:
name, _, parent_record_number, parent_sequence_number = attribute_infos[0]

return (name, parent_record_number, parent_sequence_number)

def _GetNameAndParentFromEntry(self, mft_entry):
"""Returns the most descriptive name, its parent entry record number
and sequence number from a given MFT record entry

Args:
mft_entry (pyfsntfs.file_entry): MFT entry

Returns:
tuple: A tuple of (name, parent_record_number parent_sequence_number)

"""

attribute_infos = self._GetFNAttributeInfos(mft_entry)
return self._GetNameAndParentFromAttributeInfos(attribute_infos)


def _CollectMFTEntryPathInfo(self, mft_entry):
"""Extracts data from a given $MFT entry and stores it for lookup in
order to be able to build a parent path of a file entry. This
creates a map of the entries' record number to its sequence number,
allocation status and a list of its names an parents.

Args:
mft_entry (pyfsntfs.file_entry): MFT entry.

Raises:
IOError: if MFT is not readable

"""

if mft_entry.is_empty() or mft_entry.base_record_file_reference != 0:
return

entry_reference = mft_entry.file_reference
entry_record_number = entry_reference & 0xFFFFFFFFFFFF
entry_sequence_number = entry_reference >> 48
entry_allocated = mft_entry.is_allocated()

self.path_info[entry_record_number] = (
entry_sequence_number,
entry_allocated,
self._GetFNAttributeInfos(mft_entry))

def _GetPathForFile(self, parser_mediator, filename, parent_record_number,
parent_sequence_number):
"""Crafts a full path for a given filename, given its parent
record and sequence number.

Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfvfs.
filename (str): The filename
parent_record_number (int): The parent record number to craft
the path (from the files $FN)
parent_sequence_number (int): The sequence number of the parent
(from the files $FN)

Returns:
str: The full path of the entry

"""

path_parents = self._ResolvePath(
parser_mediator, parent_record_number, parent_sequence_number)
if not path_parents:
return filename
path_parents.reverse()
path_parents.append(filename)
return self._PATH_SEPARATOR.join(path_parents)

def _ResolvePath(self, parser_mediator, record_number, sequence_number,
path_parts=None, used_records=None):
"""Constructs a path for an entry by looking up the
`record_number`, comparing the expected `sequence_number` (for
orphaned files). Crafts the parents by appending to a list, which
is why the return value is in reverse order!

Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfvfs.
record_number (int): The record number to start the path resolution
sequence_number (int): The expected sequence number
path_parts (list): A list that gets appended the path parts in
recursive calls
used_records (set): A set used to track which entries have been
used in order to break cyclic paths

Returns:
list: List of parent path objects in reverse order

"""
if path_parts is None:
path_parts = []

if used_records is None:
used_records = set()

if not record_number or \
not sequence_number or \
record_number not in self.path_info:
return path_parts

# Get the info from the map for the next parent
(parent_sequence_number, parent_entry_allocated,
parent_entry_attributes) = self.path_info.get(
record_number, (None, None, ()))

# If the entry does not have a legitimate parent, it's orphaned.
# This is the case when the parent sequence number is higher than
# the entry expects and the parent is allocated: The parent record
# was reused.
if (parent_sequence_number > sequence_number and parent_entry_allocated):
path_parts.append(self._PATH_NAME_ORPHAN)
return path_parts

# Since we are a parent (a folder), there should only be one
# reasonable $FN entry and all parent record numbers must be the
# same. Warn if this is not the case
if len(set(map(lambda i: i[2], parent_entry_attributes))) > 1:
parser_mediator.ProduceExtractionWarning((
'$MFT entry {0!s} is parent but carries multiple $FILE_NAME'
'attributes with different parents!').format(record_number))
(parent_name, parent_number,
parent_sequence) = self._GetNameAndParentFromAttributeInfos(
parent_entry_attributes)

if parent_name:
path_parts.append(parent_name)
elif parent_number:
# For some reason we have no name but a parent
path_parts.append(self._PATH_NO_NAME_REPLACEMENT)

if record_number != parent_number and parent_number not in used_records:
used_records.add(parent_number)
self._ResolvePath(parser_mediator, parent_number, parent_sequence,
path_parts, used_records)
return path_parts

def ParseFileObject(self, parser_mediator, file_object):
"""Parses a NTFS $MFT metadata file-like object.

Expand All @@ -291,6 +548,16 @@ def ParseFileObject(self, parser_mediator, file_object):
parser_mediator.ProduceExtractionWarning(
'unable to open file with error: {0!s}'.format(exception))

# Collect path information in a first round of parsing
for entry_index in range(0, mft_metadata_file.number_of_file_entries):
try:
mft_entry = mft_metadata_file.get_file_entry(entry_index)
self._CollectMFTEntryPathInfo(mft_entry)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doing this for every MFT entry can be memory intensive. Technically we only need to cache the parent path hints in self.path_info

except IOError as exception:
# We ignore the exception as it will be raised again in the
# MFT entry processing below
pass

for entry_index in range(0, mft_metadata_file.number_of_file_entries):
try:
mft_entry = mft_metadata_file.get_file_entry(entry_index)
Expand Down
61 changes: 61 additions & 0 deletions tests/parsers/ntfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,60 @@ def testParseFile(self):
self._TestGetMessageStrings(
event_data, expected_message, expected_short_message)

# Test path_hint with simple file (SAM)
event = events[28745]
event_data = self._GetEventDataOfEvent(storage_writer, event)
self.assertEqual(event_data.name, 'SAM')
self.assertEqual(event_data.path_hint, './WINDOWS/system32/config/SAM')

# Test path_hint with deleted file 'CAJA1S19.js'
event = events[120480]
event_data = self._GetEventDataOfEvent(storage_writer, event)
self.assertEqual(event_data.name, 'CAJA1S19.js')
self.assertEqual(
event_data.path_hint,
'./Documents and Settings/Donald Blake/Local Settings/Temporary Internet Files/Content.IE5/9EUWFPZ1/CAJA1S19.js') # pylint: disable=line-too-long
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't do these style overrides there is a style guide for a reason.

self.assertFalse(event_data.is_allocated)

# Testing path_hint of orphaned entry '/session/menu.text.css'
event = events[125436]
event_data = self._GetEventDataOfEvent(storage_writer, event)
file_parent_id = event_data.parent_file_reference & 0xffffffff
file_parent_seq = event_data.parent_file_reference >> 48
self.assertEqual(event_data.name, 'menu.text.css')
self.assertEqual(event_data.path_hint, '$Orphan/session/menu.text.css')

event = events[125400]
event_data = self._GetEventDataOfEvent(storage_writer, event)
folder_allocation = event_data.is_allocated
folder_id = event_data.file_reference & 0xffffffff
folder_seq = event_data.file_reference >> 48
folder_parent_id = event_data.parent_file_reference & 0xffffffff
folder_parent_seq = event_data.parent_file_reference >> 48
self.assertEqual(event_data.name, 'session')
self.assertEqual(event_data.path_hint, '$Orphan/session')
self.assertEqual(file_parent_id, folder_id)
# Assert that the folders sequence is just one above the expected
# sequence number from the file, and the folder is not allocated.
# This is what to expect in this instance as it indicates the
# folder in which the file resides was deleted but the file is
# still associated, i.e. the folders' record was not reused
self.assertTrue(file_parent_seq == 1
and folder_seq == 2
and not folder_allocation)

event = events[101097]
event_data = self._GetEventDataOfEvent(storage_writer, event)
orphan_allocation = event_data.is_allocated
orphan_id = event_data.file_reference & 0xffffffff
orphan_seq = event_data.file_reference >> 48
self.assertEqual(folder_parent_id, orphan_id)
# Now assert that the sequence number of the parent (the folder
# above 'session') is larger than the expected value and the
# record is allocated, i.e. the record has been reused
self.assertGreater(orphan_seq, folder_parent_seq)
self.assertTrue(orphan_allocation)

def testParseImage(self):
"""Tests the Parse function on a storage media image."""
parser = ntfs.NTFSMFTParser()
Expand Down Expand Up @@ -163,6 +217,13 @@ def testParseImage(self):
self._TestGetMessageStrings(
event_data, expected_message, expected_short_message)

# Check that path hint is ok
event = events[243]
event_data = self._GetEventDataOfEvent(storage_writer, event)
self.assertEqual(
event_data.path_hint,
'./System Volume Information/{3808876b-c176-4e48-b7ae-04046e6cc752}')

# Note that the source file is a RAW (VMDK flat) image.
test_file_path = self._GetTestFilePath(['multi_partition_image.vmdk'])
self._SkipIfPathNotExists(test_file_path)
Expand Down