Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PICARD-2757: Add a command-line option to enable audit #2316

Merged
merged 10 commits into from
Sep 18, 2023
96 changes: 96 additions & 0 deletions picard/audit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# -*- coding: utf-8 -*-
#
# Picard, the next-generation MusicBrainz tagger
#
# Copyright (C) 2023 Laurent Monin
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.

from collections import defaultdict
import sys
import threading
import time


def setup_audit(prefixes_string):
"""Setup audit hook according to `audit` command-line option"""
if not prefixes_string:
return
if 'all' in prefixes_string.split(','):
def event_match(event):
return ('all', )
else:
# prebuild the dict, constant
PREFIXES_DICT = make_prefixes_dict(prefixes_string)

def event_match(event):
return is_matching_a_prefix(event, PREFIXES_DICT)

start_time = time.time()

def audit(event, args):
matched = event_match(event)
if matched:
matched = '.'.join(matched)
tid = threading.get_native_id()
secs = time.time() - start_time
# we can't use log here, as it generates events
print(f'audit:{matched}:{tid}:{secs} {event} args={args}')

try:
sys.addaudithook(audit)
except AttributeError:
# sys.addaudithook() appeared in Python 3.8
pass


def list_from_prefixes_string(prefixes_string):
"""Generate a sorted list of prefixes tuples
A prefixes string is a comma-separated list of dot-separated keys
"a,b.c,d.e.f,,g" would result in following sorted list:
[('a',), ('b', 'c'), ('d', 'e', 'f'), ('g',)]
"""
yield from sorted(set(tuple(e.split('.')) for e in prefixes_string.split(',') if e))


def make_prefixes_dict(prefixes_string):
"""Build a dict with keys = length of prefix"""
d = defaultdict(list)
for prefix_tuple in list_from_prefixes_string(prefixes_string):
d[len(prefix_tuple)].append(prefix_tuple)
return dict(d)


def prefixes_candidates_for_length(length, prefixes_dict):
"""Generate prefixes that may match this length"""
for prefix_len, prefixes in prefixes_dict.items():
if length >= prefix_len:
yield from prefixes


def is_matching_a_prefix(key, prefixes_dict):
"""Matches dot-separated key against prefixes
Typical case: we want to match `os.mkdir` if prefix is `os` or `os.mkdir`
but not the reverse: if prefix is `os.mkdir` we don't want to match a key named `os`
It returns False, or the matched prefix
"""
key_tuple = tuple(key.split('.'))
key_tuple_len = len(key_tuple)
# only use candidates that may have a chance to match
for prefix_tuple in prefixes_candidates_for_length(key_tuple_len, prefixes_dict):
# check that all elements of the key are in prefix tuple
if all(prefix_part == key_tuple[i] for i, prefix_part in enumerate(prefix_tuple)):
return prefix_tuple
return False
8 changes: 8 additions & 0 deletions picard/tagger.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
NatAlbum,
run_album_post_removal_processors,
)
from picard.audit import setup_audit
from picard.browser.browser import BrowserIntegration
from picard.browser.filelookup import FileLookup
from picard.cluster import (
Expand Down Expand Up @@ -256,6 +257,9 @@ def __init__(self, picard_args, localedir, autoupdate, pipe_handler=None):
if picard_args.debug or "PICARD_DEBUG" in os.environ:
self.set_log_level(logging.DEBUG)

if picard_args.audit:
setup_audit(picard_args.audit)

# Main thread pool used for most background tasks
self.thread_pool = QtCore.QThreadPool(self)
# Two threads are needed for the pipe handler and command processing.
Expand Down Expand Up @@ -1437,6 +1441,10 @@ def process_picard_args():
parser.add_argument("-display", nargs=1, help=argparse.SUPPRESS)

# Picard specific arguments
parser.add_argument("-a", "--audit", action='store',
phw marked this conversation as resolved.
Show resolved Hide resolved
default=None,
help="audit events passed as a comma-separated list, prefixes supported, "
"use all to match any (see https://docs.python.org/3/library/audit_events.html#audit-events)")
parser.add_argument("-c", "--config-file", action='store',
default=None,
help="location of the configuration file")
Expand Down
87 changes: 87 additions & 0 deletions test/test_audit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# -*- coding: utf-8 -*-
#
# Picard, the next-generation MusicBrainz tagger
#
# Copyright (C) 2023 Laurent Monin
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.

import sys
import unittest
from unittest.mock import patch

from test.picardtestcase import PicardTestCase

from picard.audit import (
is_matching_a_prefix,
list_from_prefixes_string,
make_prefixes_dict,
prefixes_candidates_for_length,
setup_audit,
)


class AuditTest(PicardTestCase):
def test_list_from_prefixes_string(self):
def f(s):
return list(list_from_prefixes_string(s))

self.assertEqual(f(''), [])
self.assertEqual(f('a'), [('a',)])
self.assertEqual(f('a,b'), [('a',), ('b',)])
self.assertEqual(f('a,,b'), [('a',), ('b',)])
self.assertEqual(f('a.c,,b.d.f'), [('a', 'c'), ('b', 'd', 'f')])
self.assertEqual(f('b.d.f,a.c,'), [('a', 'c'), ('b', 'd', 'f')])

def test_make_prefixes_dict(self):
d = dict(make_prefixes_dict(''))
self.assertEqual(d, {})
d = dict(make_prefixes_dict('a'))
self.assertEqual(d, {1: [('a',)]})
d = dict(make_prefixes_dict('a.b'))
self.assertEqual(d, {2: [('a', 'b')]})
d = dict(make_prefixes_dict('a.b,c.d,a.b'))
self.assertEqual(d, {2: [('a', 'b'), ('c', 'd')]})
d = dict(make_prefixes_dict('a,a.b,,a.b.c'))
self.assertEqual(d, {1: [('a',)], 2: [('a', 'b')], 3: [('a', 'b', 'c')]})

def test_prefixes_candidates_for_length(self):
d = make_prefixes_dict('a,a.b,c.d,a.b.c,d.e.f,g.h.i')
self.assertEqual(list(prefixes_candidates_for_length(0, d)), [])
self.assertEqual(list(prefixes_candidates_for_length(1, d)), [('a',)])
self.assertEqual(list(prefixes_candidates_for_length(2, d)), [('a',), ('a', 'b'), ('c', 'd')])
expected = [('a',), ('a', 'b'), ('c', 'd'), ('a', 'b', 'c'), ('d', 'e', 'f'), ('g', 'h', 'i')]
self.assertEqual(list(prefixes_candidates_for_length(3, d)), expected)
self.assertEqual(list(prefixes_candidates_for_length(4, d)), expected)

def test_is_matching_a_prefix(self):
d = make_prefixes_dict('a.b')
self.assertEqual(is_matching_a_prefix('a', d), False)
self.assertEqual(is_matching_a_prefix('a.b', d), ('a', 'b'))
self.assertEqual(is_matching_a_prefix('a.b.c', d), ('a', 'b'))
self.assertEqual(is_matching_a_prefix('b.c', d), False)


@unittest.skipUnless(sys.version_info[:3] > (3, 8), "sys.addaudithook() available since Python 3.8")
class AuditHookTest(PicardTestCase):
def test_setup_audit_1(self):
with patch('sys.addaudithook') as mock:
setup_audit('a,b.c')
self.assertTrue(mock.called)

def test_setup_audit_2(self):
with patch('sys.addaudithook') as mock:
setup_audit('')
self.assertFalse(mock.called)
Loading