Skip to content

Commit

Permalink
Defer FPRule counter updates during normalization
Browse files Browse the repository at this point in the history
  • Loading branch information
sevein committed Oct 31, 2024
1 parent bade7ad commit 3aadaa8
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 10 deletions.
88 changes: 86 additions & 2 deletions src/MCPClient/lib/clientScripts/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@
import shutil
import traceback
import uuid
from collections import defaultdict
from functools import wraps
from typing import Callable
from typing import DefaultDict
from typing import Dict
from typing import List
from typing import Optional
from typing import TypeVar

import django
import transcoder
Expand All @@ -20,17 +25,21 @@
import databaseFunctions
import fileOperations
from client.job import Job
from custom_handlers import get_script_logger
from dicts import ReplacementDict
from django.conf import settings as mcpclient_settings
from django.core.exceptions import ValidationError
from django.db import transaction
from django.db.models import F
from fpr.models import FPRule
from lib import setup_dicts
from main.models import Derivation
from main.models import File
from main.models import FileFormatVersion
from main.models import FileID

logger = get_script_logger("archivematica.mcp.client.normalize")

# Return codes
SUCCESS = 0
RULE_FAILED = 1
Expand Down Expand Up @@ -355,7 +364,75 @@ def get_default_rule(purpose: str) -> FPRule:
return FPRule.active.get(purpose="default_" + purpose)


def main(job: Job, opts: NormalizeArgs) -> int:
class DeferredFPRuleCounter:
"""Deferred counter for FPRule attempts, successes, and failures.
This class postpones database writes to aggregate updates and minimize the
duration of transactions, which is beneficial when dealing with long-running
batches.
"""

def __init__(self) -> None:
self._counters: DefaultDict[uuid.UUID, Dict[str, int]] = defaultdict(
lambda: {"count_attempts": 0, "count_okay": 0, "count_not_okay": 0}
)

def record_attempt(self, fprule: FPRule) -> None:
self._counters[fprule.uuid]["count_attempts"] += 1

def record_success(self, fprule: FPRule) -> None:
self._counters[fprule.uuid]["count_okay"] += 1

def record_failure(self, fprule: FPRule) -> None:
self._counters[fprule.uuid]["count_not_okay"] += 1

def save(self) -> None:
"""Persist all aggregated FPRule counters in a single transaction.
This method updates the success and failure rates of FPRules by
incrementing their respective counters. It uses Django's F() expressions
to ensure atomic updates and prevent race conditions.
"""
with transaction.atomic():
for fprule_id, increments in self._counters.items():
FPRule.objects.filter(uuid=fprule_id).update(
count_attempts=F("count_attempts") + increments["count_attempts"],
count_okay=F("count_okay") + increments["count_okay"],
count_not_okay=F("count_not_okay") + increments["count_not_okay"],
)


DeferredResult = TypeVar("DeferredResult")


def deferred_fprule_counter(
func: Callable[..., DeferredResult],
) -> Callable[..., DeferredResult]:
"""
Decorator that injects a ``DeferredFPRuleCounter`` instance into the
function and saves it when the function completes, regardless of its
outcome.
"""

@wraps(func)
def wrapper(
job: Job, opts: NormalizeArgs, *args: object, **kwargs: object
) -> DeferredResult:
counter = DeferredFPRuleCounter()
try:
return func(job, opts, counter, *args, **kwargs)
finally:
# Best-effort save attempt; silently ignore any exceptions.
try:
counter.save()
except Exception as err:
logger.error("Failed to save counters: %s", err, exc_info=True)

return wrapper


@deferred_fprule_counter
def main(job: Job, opts: NormalizeArgs, counter: DeferredFPRuleCounter) -> int:
"""Find and execute normalization commands on input file."""
# TODO fix for maildir working only on attachments

Expand Down Expand Up @@ -489,7 +566,13 @@ def main(job: Job, opts: NormalizeArgs) -> int:

replacement_dict = get_replacement_dict(job, opts)
cl = transcoder.CommandLinker(
job, rule, command, replacement_dict, opts, once_normalized_callback(job)
job,
rule,
command,
replacement_dict,
opts,
once_normalized_callback(job),
counter,
)
exitstatus = cl.execute()

Expand Down Expand Up @@ -540,6 +623,7 @@ def main(job: Job, opts: NormalizeArgs) -> int:
replacement_dict,
opts,
once_normalized_callback(job),
counter,
)
exitstatus = cl.execute()

Expand Down
15 changes: 7 additions & 8 deletions src/MCPClient/lib/clientScripts/transcoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#
# You should have received a copy of the GNU General Public License
# along with Archivematica. If not, see <http://www.gnu.org/licenses/>.
from django.db.models import F
from executeOrRunSubProcess import executeOrRun


Expand Down Expand Up @@ -109,7 +108,9 @@ def execute(self, skip_on_success=False):


class CommandLinker:
def __init__(self, job, fprule, command, replacement_dict, opts, on_success):
def __init__(
self, job, fprule, command, replacement_dict, opts, on_success, counter
):
self.fprule = fprule
self.command = command
self.replacement_dict = replacement_dict
Expand All @@ -118,6 +119,7 @@ def __init__(self, job, fprule, command, replacement_dict, opts, on_success):
self.commandObject = Command(
job, self.command, replacement_dict, self.on_success, opts
)
self.counter = counter

def __str__(self):
return (
Expand All @@ -128,13 +130,10 @@ def execute(self):
"""Execute the command, and track the success statistics.
Returns 0 on success, non-0 on failure."""
# Track success/failure rates of FP Rules
# Use Django's F() to prevent race condition updating the counts
self.fprule.count_attempts = F("count_attempts") + 1
self.counter.record_attempt(self.fprule)
ret = self.commandObject.execute()
if ret:
self.fprule.count_not_okay = F("count_not_okay") + 1
self.counter.record_failure(self.fprule)
else:
self.fprule.count_okay = F("count_okay") + 1
self.fprule.save()
self.counter.record_success(self.fprule)
return ret

0 comments on commit 3aadaa8

Please sign in to comment.