-
Notifications
You must be signed in to change notification settings - Fork 361
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ntfs path hint #2702
Ntfs path hint #2702
Changes from all commits
faafc1c
f4042d5
2622426
11adb89
8f39589
b6d35d5
d697a39
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ | |
from plaso.parsers import manager | ||
|
||
|
||
# pylint: disable=too-many-instance-attributes,too-few-public-methods | ||
class NTFSFileStatEventData(events.EventData): | ||
"""NTFS file system stat event data. | ||
|
||
|
@@ -35,6 +36,8 @@ class NTFSFileStatEventData(events.EventData): | |
name (str): name associated with the stat event, for example that of | ||
a $FILE_NAME attribute or None if not available. | ||
parent_file_reference (int): NTFS file reference of the parent. | ||
path_hint (str): A path to the NTFS file constructed from the | ||
`parent_file_reference` | ||
""" | ||
|
||
DATA_TYPE = 'fs:stat:ntfs' | ||
|
@@ -49,6 +52,7 @@ def __init__(self): | |
self.is_allocated = None | ||
self.name = None | ||
self.parent_file_reference = None | ||
self.path_hint = None | ||
|
||
|
||
class NTFSUSNChangeEventData(events.EventData): | ||
|
@@ -90,6 +94,14 @@ class NTFSMFTParser(interface.FileObjectParser): | |
_MFT_ATTRIBUTE_STANDARD_INFORMATION = 0x00000010 | ||
_MFT_ATTRIBUTE_FILE_NAME = 0x00000030 | ||
_MFT_ATTRIBUTE_OBJECT_ID = 0x00000040 | ||
_PATH_SEPARATOR = '/' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why use the forward slash? |
||
_PATH_NO_NAME_REPLACEMENT = '???' | ||
_PATH_NAME_ORPHAN = '$Orphan' | ||
|
||
def __init__(self): | ||
"""Intializes the NTFS MFT Parser""" | ||
super(NTFSMFTParser, self).__init__() | ||
self.path_info = dict() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not |
||
|
||
@classmethod | ||
def GetFormatSpecification(cls): | ||
|
@@ -167,6 +179,19 @@ def _ParseMFTAttribute(self, parser_mediator, mft_entry, mft_attribute): | |
event_data.name = name | ||
event_data.parent_file_reference = parent_file_reference | ||
|
||
if mft_attribute.attribute_type == self._MFT_ATTRIBUTE_FILE_NAME: | ||
parent_record_number = parent_file_reference & 0xffffffffffff | ||
parent_sequence_number = parent_file_reference >> 48 | ||
event_data.path_hint = self._GetPathForFile( | ||
parser_mediator, name, parent_record_number, parent_sequence_number) | ||
else: | ||
# Even though $SI attributes do not carry a name, we are | ||
# opportunistic and use the most descriptive name available | ||
(name, parent_record_number, | ||
parent_sequence_number) = self._GetNameAndParentFromEntry(mft_entry) | ||
event_data.path_hint = self._GetPathForFile( | ||
parser_mediator, name, parent_record_number, parent_sequence_number) | ||
|
||
try: | ||
creation_time = mft_attribute.get_creation_time_as_integer() | ||
except OverflowError as exception: | ||
|
@@ -275,6 +300,238 @@ def _ParseMFTEntry(self, parser_mediator, mft_entry): | |
'unable to parse MFT attribute: {0:d} with error: {1!s}').format( | ||
attribute_index, exception)) | ||
|
||
def _GetFNAttributeInfos(self, mft_entry): | ||
"""Returns a list of tuples containing information required to derive | ||
the most descriptive name for a record. | ||
|
||
Args: | ||
mft_entry (pyfsntfs.file_entry): MFT entry | ||
|
||
Returns: | ||
list[tuple]: List of tuples with (name, attribute_index, | ||
parent_record_number, parent_sequence_number) | ||
|
||
""" | ||
attribute_info = [] | ||
for attribute_index in range(0, mft_entry.number_of_attributes): | ||
mft_attribute = mft_entry.get_attribute(attribute_index) | ||
if mft_attribute.attribute_type == self._MFT_ATTRIBUTE_FILE_NAME: | ||
parent_file_reference = mft_attribute.parent_file_reference | ||
parent_record_number = parent_file_reference & 0xFFFFFFFFFFFF | ||
parent_sequence_number = parent_file_reference >> 48 | ||
attribute_info.append((getattr(mft_attribute, 'name', ''), | ||
attribute_index, | ||
parent_record_number, | ||
parent_sequence_number)) | ||
return attribute_info | ||
|
||
def _GetNameAndParentFromAttributeInfos(self, attribute_infos): | ||
"""Returns the most descriptive name, parent entry record number and | ||
sequence number from the return value of `GetFNAttributeInfos`. | ||
|
||
Each $MFT entry can have multiple $FILE_NAME attributes containing | ||
different names. One prominent example is when a file/folder name | ||
exceeds 8 characters, the $MFT will then contain two entries, one | ||
"normal" entry with the "full" name, and a DOS-compatible one | ||
(8.3). Each $FN attribute has a namespace value that denotes the | ||
name's type: | ||
|
||
0x0: POSIX (Case sensitive; all unicode except '/' and NULL) | ||
0x1: Win32 (Case insensitive; all unicode except '/', '\', ':', | ||
'>', '<', '?') | ||
0x2: DOS (Case insensitive; all upper case and no special characters. | ||
Must be 8 or fewer for name, 3 or less for the extension) | ||
0x3: Win32 & DOS (When the name is Win32 but does already fit in | ||
the DOS namespace) | ||
|
||
Rule of precedence for this function is: "0x3 > 0x1 > 0x0 > 0x2". | ||
On same value entries, the lower attribute index wins | ||
|
||
TODO: `namespace` (byte 65 of the $FILE_NAME attribute) is not | ||
available in `pyfsntfs.file_name_attribute`. For now we "guess" | ||
and go with the "longest string is best string, with lower | ||
attribute index preference" | ||
|
||
Args: | ||
attribute_infos (list[tuple]): A list of tuples produced by | ||
`_GetFNAttributeInfos` | ||
|
||
Returns: | ||
tuple: A tuple of (name, parent_record_number, parent_sequence_number) | ||
|
||
""" | ||
|
||
name = None | ||
parent_record_number = None | ||
parent_sequence_number = None | ||
|
||
# Sort the attributes by re-mapping the `namespace` values | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. code is not used, please remove |
||
# ns_map = { | ||
# 0x0: 0x2, | ||
# 0x1: 0x1, | ||
# 0x2: 0x3, | ||
# 0x3: 0x0 | ||
# } | ||
# file_name_attributes.sort(key=lambda a: (ns_map[a[4]], a[1])) | ||
|
||
# Sort by name length and attribute index | ||
attribute_infos.sort(key=lambda a: (len(a[0]), a[1]), reverse=True) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. per style guide |
||
|
||
# Go through the sorted attribute infos, first one that fulfills | ||
# our criteria wins (criteria being name must not be empty) | ||
for (attribute_name, _, attribute_parent_num, | ||
attribute_parent_seq) in attribute_infos: | ||
if attribute_name: | ||
name = attribute_name | ||
parent_record_number = attribute_parent_num | ||
parent_sequence_number = attribute_parent_seq | ||
break | ||
|
||
# If we have not found suitable entry and there are entries, we take | ||
# the first one as last resort | ||
if attribute_infos and name is None: | ||
name, _, parent_record_number, parent_sequence_number = attribute_infos[0] | ||
|
||
return (name, parent_record_number, parent_sequence_number) | ||
|
||
def _GetNameAndParentFromEntry(self, mft_entry): | ||
"""Returns the most descriptive name, its parent entry record number | ||
and sequence number from a given MFT record entry | ||
|
||
Args: | ||
mft_entry (pyfsntfs.file_entry): MFT entry | ||
|
||
Returns: | ||
tuple: A tuple of (name, parent_record_number parent_sequence_number) | ||
|
||
""" | ||
|
||
attribute_infos = self._GetFNAttributeInfos(mft_entry) | ||
return self._GetNameAndParentFromAttributeInfos(attribute_infos) | ||
|
||
|
||
def _CollectMFTEntryPathInfo(self, mft_entry): | ||
"""Extracts data from a given $MFT entry and stores it for lookup in | ||
order to be able to build a parent path of a file entry. This | ||
creates a map of the entries' record number to its sequence number, | ||
allocation status and a list of its names an parents. | ||
|
||
Args: | ||
mft_entry (pyfsntfs.file_entry): MFT entry. | ||
|
||
Raises: | ||
IOError: if MFT is not readable | ||
|
||
""" | ||
|
||
if mft_entry.is_empty() or mft_entry.base_record_file_reference != 0: | ||
return | ||
|
||
entry_reference = mft_entry.file_reference | ||
entry_record_number = entry_reference & 0xFFFFFFFFFFFF | ||
entry_sequence_number = entry_reference >> 48 | ||
entry_allocated = mft_entry.is_allocated() | ||
|
||
self.path_info[entry_record_number] = ( | ||
entry_sequence_number, | ||
entry_allocated, | ||
self._GetFNAttributeInfos(mft_entry)) | ||
|
||
def _GetPathForFile(self, parser_mediator, filename, parent_record_number, | ||
parent_sequence_number): | ||
"""Crafts a full path for a given filename, given its parent | ||
record and sequence number. | ||
|
||
Args: | ||
parser_mediator (ParserMediator): mediates interactions between parsers | ||
and other components, such as storage and dfvfs. | ||
filename (str): The filename | ||
parent_record_number (int): The parent record number to craft | ||
the path (from the files $FN) | ||
parent_sequence_number (int): The sequence number of the parent | ||
(from the files $FN) | ||
|
||
Returns: | ||
str: The full path of the entry | ||
|
||
""" | ||
|
||
path_parents = self._ResolvePath( | ||
parser_mediator, parent_record_number, parent_sequence_number) | ||
if not path_parents: | ||
return filename | ||
path_parents.reverse() | ||
path_parents.append(filename) | ||
return self._PATH_SEPARATOR.join(path_parents) | ||
|
||
def _ResolvePath(self, parser_mediator, record_number, sequence_number, | ||
path_parts=None, used_records=None): | ||
"""Constructs a path for an entry by looking up the | ||
`record_number`, comparing the expected `sequence_number` (for | ||
orphaned files). Crafts the parents by appending to a list, which | ||
is why the return value is in reverse order! | ||
|
||
Args: | ||
parser_mediator (ParserMediator): mediates interactions between parsers | ||
and other components, such as storage and dfvfs. | ||
record_number (int): The record number to start the path resolution | ||
sequence_number (int): The expected sequence number | ||
path_parts (list): A list that gets appended the path parts in | ||
recursive calls | ||
used_records (set): A set used to track which entries have been | ||
used in order to break cyclic paths | ||
|
||
Returns: | ||
list: List of parent path objects in reverse order | ||
|
||
""" | ||
if path_parts is None: | ||
path_parts = [] | ||
|
||
if used_records is None: | ||
used_records = set() | ||
|
||
if not record_number or \ | ||
not sequence_number or \ | ||
record_number not in self.path_info: | ||
return path_parts | ||
|
||
# Get the info from the map for the next parent | ||
(parent_sequence_number, parent_entry_allocated, | ||
parent_entry_attributes) = self.path_info.get( | ||
record_number, (None, None, ())) | ||
|
||
# If the entry does not have a legitimate parent, it's orphaned. | ||
# This is the case when the parent sequence number is higher than | ||
# the entry expects and the parent is allocated: The parent record | ||
# was reused. | ||
if (parent_sequence_number > sequence_number and parent_entry_allocated): | ||
path_parts.append(self._PATH_NAME_ORPHAN) | ||
return path_parts | ||
|
||
# Since we are a parent (a folder), there should only be one | ||
# reasonable $FN entry and all parent record numbers must be the | ||
# same. Warn if this is not the case | ||
if len(set(map(lambda i: i[2], parent_entry_attributes))) > 1: | ||
parser_mediator.ProduceExtractionWarning(( | ||
'$MFT entry {0!s} is parent but carries multiple $FILE_NAME' | ||
'attributes with different parents!').format(record_number)) | ||
(parent_name, parent_number, | ||
parent_sequence) = self._GetNameAndParentFromAttributeInfos( | ||
parent_entry_attributes) | ||
|
||
if parent_name: | ||
path_parts.append(parent_name) | ||
elif parent_number: | ||
# For some reason we have no name but a parent | ||
path_parts.append(self._PATH_NO_NAME_REPLACEMENT) | ||
|
||
if record_number != parent_number and parent_number not in used_records: | ||
used_records.add(parent_number) | ||
self._ResolvePath(parser_mediator, parent_number, parent_sequence, | ||
path_parts, used_records) | ||
return path_parts | ||
|
||
def ParseFileObject(self, parser_mediator, file_object): | ||
"""Parses a NTFS $MFT metadata file-like object. | ||
|
||
|
@@ -291,6 +548,16 @@ def ParseFileObject(self, parser_mediator, file_object): | |
parser_mediator.ProduceExtractionWarning( | ||
'unable to open file with error: {0!s}'.format(exception)) | ||
|
||
# Collect path information in a first round of parsing | ||
for entry_index in range(0, mft_metadata_file.number_of_file_entries): | ||
try: | ||
mft_entry = mft_metadata_file.get_file_entry(entry_index) | ||
self._CollectMFTEntryPathInfo(mft_entry) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. doing this for every MFT entry can be memory intensive. Technically we only need to cache the parent path hints in |
||
except IOError as exception: | ||
# We ignore the exception as it will be raised again in the | ||
# MFT entry processing below | ||
pass | ||
|
||
for entry_index in range(0, mft_metadata_file.number_of_file_entries): | ||
try: | ||
mft_entry = mft_metadata_file.get_file_entry(entry_index) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,6 +57,60 @@ def testParseFile(self): | |
self._TestGetMessageStrings( | ||
event_data, expected_message, expected_short_message) | ||
|
||
# Test path_hint with simple file (SAM) | ||
event = events[28745] | ||
event_data = self._GetEventDataOfEvent(storage_writer, event) | ||
self.assertEqual(event_data.name, 'SAM') | ||
self.assertEqual(event_data.path_hint, './WINDOWS/system32/config/SAM') | ||
|
||
# Test path_hint with deleted file 'CAJA1S19.js' | ||
event = events[120480] | ||
event_data = self._GetEventDataOfEvent(storage_writer, event) | ||
self.assertEqual(event_data.name, 'CAJA1S19.js') | ||
self.assertEqual( | ||
event_data.path_hint, | ||
'./Documents and Settings/Donald Blake/Local Settings/Temporary Internet Files/Content.IE5/9EUWFPZ1/CAJA1S19.js') # pylint: disable=line-too-long | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please don't do these style overrides there is a style guide for a reason. |
||
self.assertFalse(event_data.is_allocated) | ||
|
||
# Testing path_hint of orphaned entry '/session/menu.text.css' | ||
event = events[125436] | ||
event_data = self._GetEventDataOfEvent(storage_writer, event) | ||
file_parent_id = event_data.parent_file_reference & 0xffffffff | ||
file_parent_seq = event_data.parent_file_reference >> 48 | ||
self.assertEqual(event_data.name, 'menu.text.css') | ||
self.assertEqual(event_data.path_hint, '$Orphan/session/menu.text.css') | ||
|
||
event = events[125400] | ||
event_data = self._GetEventDataOfEvent(storage_writer, event) | ||
folder_allocation = event_data.is_allocated | ||
folder_id = event_data.file_reference & 0xffffffff | ||
folder_seq = event_data.file_reference >> 48 | ||
folder_parent_id = event_data.parent_file_reference & 0xffffffff | ||
folder_parent_seq = event_data.parent_file_reference >> 48 | ||
self.assertEqual(event_data.name, 'session') | ||
self.assertEqual(event_data.path_hint, '$Orphan/session') | ||
self.assertEqual(file_parent_id, folder_id) | ||
# Assert that the folders sequence is just one above the expected | ||
# sequence number from the file, and the folder is not allocated. | ||
# This is what to expect in this instance as it indicates the | ||
# folder in which the file resides was deleted but the file is | ||
# still associated, i.e. the folders' record was not reused | ||
self.assertTrue(file_parent_seq == 1 | ||
and folder_seq == 2 | ||
and not folder_allocation) | ||
|
||
event = events[101097] | ||
event_data = self._GetEventDataOfEvent(storage_writer, event) | ||
orphan_allocation = event_data.is_allocated | ||
orphan_id = event_data.file_reference & 0xffffffff | ||
orphan_seq = event_data.file_reference >> 48 | ||
self.assertEqual(folder_parent_id, orphan_id) | ||
# Now assert that the sequence number of the parent (the folder | ||
# above 'session') is larger than the expected value and the | ||
# record is allocated, i.e. the record has been reused | ||
self.assertGreater(orphan_seq, folder_parent_seq) | ||
self.assertTrue(orphan_allocation) | ||
|
||
def testParseImage(self): | ||
"""Tests the Parse function on a storage media image.""" | ||
parser = ntfs.NTFSMFTParser() | ||
|
@@ -163,6 +217,13 @@ def testParseImage(self): | |
self._TestGetMessageStrings( | ||
event_data, expected_message, expected_short_message) | ||
|
||
# Check that path hint is ok | ||
event = events[243] | ||
event_data = self._GetEventDataOfEvent(storage_writer, event) | ||
self.assertEqual( | ||
event_data.path_hint, | ||
'./System Volume Information/{3808876b-c176-4e48-b7ae-04046e6cc752}') | ||
|
||
# Note that the source file is a RAW (VMDK flat) image. | ||
test_file_path = self._GetTestFilePath(['multi_partition_image.vmdk']) | ||
self._SkipIfPathNotExists(test_file_path) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which version of pylint did you use?