Skip to content

Commit

Permalink
quality_control: remove imports that were deprecated in Python 3.9 (#…
Browse files Browse the repository at this point in the history
…8681)

This is a continuation of #8626.
  • Loading branch information
SpecLad authored Nov 11, 2024
1 parent 04cd062 commit 1e7ff33
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 41 deletions.
3 changes: 2 additions & 1 deletion cvat/apps/quality_control/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

from __future__ import annotations

from collections.abc import Sequence
from copy import deepcopy
from enum import Enum
from typing import Any, Sequence
from typing import Any

from django.core.exceptions import ValidationError
from django.db import models
Expand Down
81 changes: 41 additions & 40 deletions cvat/apps/quality_control/quality_reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
import itertools
import math
from collections import Counter
from collections.abc import Hashable, Sequence
from copy import deepcopy
from datetime import timedelta
from functools import cached_property, partial
from typing import Any, Callable, Dict, Hashable, List, Optional, Sequence, Tuple, Union, cast
from typing import Any, Callable, Optional, Union, cast

import datumaro as dm
import datumaro.util.mask_tools
Expand Down Expand Up @@ -77,7 +78,7 @@ def _value_serializer(self, v):
def to_dict(self) -> dict:
return self._value_serializer(self._fields_dict())

def _fields_dict(self, *, include_properties: Optional[List[str]] = None) -> dict:
def _fields_dict(self, *, include_properties: Optional[list[str]] = None) -> dict:
d = asdict(self, recurse=False)

for field_name in include_properties or []:
Expand Down Expand Up @@ -117,7 +118,7 @@ def from_dict(cls, d: dict):
class AnnotationConflict(_Serializable):
frame_id: int
type: AnnotationConflictType
annotation_ids: List[AnnotationId]
annotation_ids: list[AnnotationId]

@property
def severity(self) -> AnnotationConflictSeverity:
Expand Down Expand Up @@ -146,7 +147,7 @@ def _value_serializer(self, v):
else:
return super()._value_serializer(v)

def _fields_dict(self, *, include_properties: Optional[List[str]] = None) -> dict:
def _fields_dict(self, *, include_properties: Optional[list[str]] = None) -> dict:
return super()._fields_dict(include_properties=include_properties or ["severity"])

@classmethod
Expand All @@ -160,7 +161,7 @@ def from_dict(cls, d: dict):

@define(kw_only=True)
class ComparisonParameters(_Serializable):
included_annotation_types: List[dm.AnnotationType] = [
included_annotation_types: list[dm.AnnotationType] = [
dm.AnnotationType.bbox,
dm.AnnotationType.points,
dm.AnnotationType.mask,
Expand All @@ -176,7 +177,7 @@ class ComparisonParameters(_Serializable):
compare_attributes: bool = True
"Enables or disables attribute checks"

ignored_attributes: List[str] = []
ignored_attributes: list[str] = []

iou_threshold: float = 0.4
"Used for distinction between matched / unmatched shapes"
Expand Down Expand Up @@ -238,7 +239,7 @@ def from_dict(cls, d: dict):

@define(kw_only=True)
class ConfusionMatrix(_Serializable):
labels: List[str]
labels: list[str]
rows: np.ndarray
precision: np.ndarray
recall: np.ndarray
Expand All @@ -255,7 +256,7 @@ def _value_serializer(self, v):
else:
return super()._value_serializer(v)

def _fields_dict(self, *, include_properties: Optional[List[str]] = None) -> dict:
def _fields_dict(self, *, include_properties: Optional[list[str]] = None) -> dict:
return super()._fields_dict(include_properties=include_properties or ["axes"])

@classmethod
Expand Down Expand Up @@ -305,7 +306,7 @@ def accumulate(self, other: ComparisonReportAnnotationsSummary):
]:
setattr(self, field, getattr(self, field) + getattr(other, field))

def _fields_dict(self, *, include_properties: Optional[List[str]] = None) -> dict:
def _fields_dict(self, *, include_properties: Optional[list[str]] = None) -> dict:
return super()._fields_dict(
include_properties=include_properties or ["accuracy", "precision", "recall"]
)
Expand Down Expand Up @@ -348,7 +349,7 @@ def accumulate(self, other: ComparisonReportAnnotationShapeSummary):
]:
setattr(self, field, getattr(self, field) + getattr(other, field))

def _fields_dict(self, *, include_properties: Optional[List[str]] = None) -> dict:
def _fields_dict(self, *, include_properties: Optional[list[str]] = None) -> dict:
return super()._fields_dict(include_properties=include_properties or ["accuracy"])

@classmethod
Expand Down Expand Up @@ -378,7 +379,7 @@ def accumulate(self, other: ComparisonReportAnnotationLabelSummary):
for field in ["valid_count", "total_count", "invalid_count"]:
setattr(self, field, getattr(self, field) + getattr(other, field))

def _fields_dict(self, *, include_properties: Optional[List[str]] = None) -> dict:
def _fields_dict(self, *, include_properties: Optional[list[str]] = None) -> dict:
return super()._fields_dict(include_properties=include_properties or ["accuracy"])

@classmethod
Expand Down Expand Up @@ -410,7 +411,7 @@ def from_dict(cls, d: dict):
@define(kw_only=True)
class ComparisonReportComparisonSummary(_Serializable):
frame_share: float
frames: List[str]
frames: list[str]

@property
def mean_conflict_count(self) -> float:
Expand All @@ -419,7 +420,7 @@ def mean_conflict_count(self) -> float:
conflict_count: int
warning_count: int
error_count: int
conflicts_by_type: Dict[AnnotationConflictType, int]
conflicts_by_type: dict[AnnotationConflictType, int]

annotations: ComparisonReportAnnotationsSummary
annotation_components: ComparisonReportAnnotationComponentsSummary
Expand All @@ -434,7 +435,7 @@ def _value_serializer(self, v):
else:
return super()._value_serializer(v)

def _fields_dict(self, *, include_properties: Optional[List[str]] = None) -> dict:
def _fields_dict(self, *, include_properties: Optional[list[str]] = None) -> dict:
return super()._fields_dict(
include_properties=include_properties
or [
Expand Down Expand Up @@ -466,7 +467,7 @@ def from_dict(cls, d: dict):

@define(kw_only=True, init=False)
class ComparisonReportFrameSummary(_Serializable):
conflicts: List[AnnotationConflict]
conflicts: list[AnnotationConflict]

@cached_property
def conflict_count(self) -> int:
Expand All @@ -481,7 +482,7 @@ def error_count(self) -> int:
return len([c for c in self.conflicts if c.severity == AnnotationConflictSeverity.ERROR])

@cached_property
def conflicts_by_type(self) -> Dict[AnnotationConflictType, int]:
def conflicts_by_type(self) -> dict[AnnotationConflictType, int]:
return Counter(c.type for c in self.conflicts)

annotations: ComparisonReportAnnotationsSummary
Expand All @@ -503,7 +504,7 @@ def __init__(self, *args, **kwargs):

self.__attrs_init__(*args, **kwargs)

def _fields_dict(self, *, include_properties: Optional[List[str]] = None) -> dict:
def _fields_dict(self, *, include_properties: Optional[list[str]] = None) -> dict:
return super()._fields_dict(include_properties=include_properties or self._CACHED_FIELDS)

@classmethod
Expand Down Expand Up @@ -534,14 +535,14 @@ def from_dict(cls, d: dict):
class ComparisonReport(_Serializable):
parameters: ComparisonParameters
comparison_summary: ComparisonReportComparisonSummary
frame_results: Dict[int, ComparisonReportFrameSummary]
frame_results: dict[int, ComparisonReportFrameSummary]

@property
def conflicts(self) -> List[AnnotationConflict]:
def conflicts(self) -> list[AnnotationConflict]:
return list(itertools.chain.from_iterable(r.conflicts for r in self.frame_results.values()))

@classmethod
def from_dict(cls, d: Dict[str, Any]) -> ComparisonReport:
def from_dict(cls, d: dict[str, Any]) -> ComparisonReport:
return cls(
parameters=ComparisonParameters.from_dict(d["parameters"]),
comparison_summary=ComparisonReportComparisonSummary.from_dict(d["comparison_summary"]),
Expand Down Expand Up @@ -632,7 +633,7 @@ def get_source_ann(
def clear(self):
self._annotation_mapping.clear()

def __call__(self, *args, **kwargs) -> List[dm.Annotation]:
def __call__(self, *args, **kwargs) -> list[dm.Annotation]:
converter = _MemoizingAnnotationConverter(*args, factory=self, **kwargs)
return converter.convert()

Expand Down Expand Up @@ -861,7 +862,7 @@ def _compare_lines(self, a: np.ndarray, b: np.ndarray) -> float:
return sum(np.exp(-(dists**2) / (2 * scale * (2 * self.torso_r) ** 2))) / len(a)

@classmethod
def approximate_points(cls, a: np.ndarray, b: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
def approximate_points(cls, a: np.ndarray, b: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
"""
Creates 2 polylines with the same numbers of points,
the points are placed on the original lines with the same step.
Expand Down Expand Up @@ -959,7 +960,7 @@ def __init__(
self,
categories: dm.CategoriesInfo,
*,
included_ann_types: Optional[List[dm.AnnotationType]] = None,
included_ann_types: Optional[list[dm.AnnotationType]] = None,
return_distances: bool = False,
iou_threshold: float = 0.5,
# https://cocodataset.org/#keypoints-eval
Expand Down Expand Up @@ -994,7 +995,7 @@ def __init__(

def _instance_bbox(
self, instance_anns: Sequence[dm.Annotation]
) -> Tuple[float, float, float, float]:
) -> tuple[float, float, float, float]:
return dm.ops.max_bbox(
a.get_bbox() if isinstance(a, dm.Skeleton) else a
for a in instance_anns
Expand Down Expand Up @@ -1141,7 +1142,7 @@ def _find_instances(annotations):
return instances, instance_map

def _get_compiled_mask(
anns: Sequence[dm.Annotation], *, instance_ids: Dict[int, int]
anns: Sequence[dm.Annotation], *, instance_ids: dict[int, int]
) -> dm.CompiledMask:
if not anns:
return None
Expand Down Expand Up @@ -1583,7 +1584,7 @@ def match_attrs(self, ann_a: dm.Annotation, ann_b: dm.Annotation):

def find_groups(
self, item: dm.DatasetItem
) -> Tuple[Dict[int, List[dm.Annotation]], Dict[int, int]]:
) -> tuple[dict[int, list[dm.Annotation]], dict[int, int]]:
ann_groups = dm.ops.find_instances(
[
ann
Expand Down Expand Up @@ -1632,7 +1633,7 @@ def _group_distance(gt_group_id, ds_group_id):

return ds_to_gt_groups

def find_covered(self, item: dm.DatasetItem) -> List[dm.Annotation]:
def find_covered(self, item: dm.DatasetItem) -> list[dm.Annotation]:
# Get annotations that can cover or be covered
spatial_types = {
dm.AnnotationType.polygon,
Expand Down Expand Up @@ -1707,7 +1708,7 @@ def __init__(
self._ds_dataset = self._ds_data_provider.dm_dataset
self._gt_dataset = self._gt_data_provider.dm_dataset

self._frame_results: Dict[int, ComparisonReportFrameSummary] = {}
self._frame_results: dict[int, ComparisonReportFrameSummary] = {}

self.comparator = _Comparator(self._gt_dataset.categories(), settings=settings)

Expand Down Expand Up @@ -1744,7 +1745,7 @@ def _find_gt_conflicts(self):

def _process_frame(
self, ds_item: dm.DatasetItem, gt_item: dm.DatasetItem
) -> List[AnnotationConflict]:
) -> list[AnnotationConflict]:
frame_id = self._dm_item_to_frame_id(ds_item, self._ds_dataset)

frame_results = self.comparator.match_annotations(gt_item, ds_item)
Expand All @@ -1756,7 +1757,7 @@ def _process_frame(

def _generate_frame_annotation_conflicts(
self, frame_id: str, frame_results, *, gt_item: dm.DatasetItem, ds_item: dm.DatasetItem
) -> List[AnnotationConflict]:
) -> list[AnnotationConflict]:
conflicts = []

matches, mismatches, gt_unmatched, ds_unmatched, _ = frame_results["all_ann_types"]
Expand Down Expand Up @@ -2017,7 +2018,7 @@ def _find_closest_unmatched_shape(shape: dm.Annotation):
# row/column index in the confusion matrix corresponding to unmatched annotations
_UNMATCHED_IDX = -1

def _make_zero_confusion_matrix(self) -> Tuple[List[str], np.ndarray, Dict[int, int]]:
def _make_zero_confusion_matrix(self) -> tuple[list[str], np.ndarray, dict[int, int]]:
label_id_idx_map = {}
label_names = []
for label_id, label in enumerate(self._gt_dataset.categories()[dm.AnnotationType.label]):
Expand All @@ -2033,7 +2034,7 @@ def _make_zero_confusion_matrix(self) -> Tuple[List[str], np.ndarray, Dict[int,
return label_names, confusion_matrix, label_id_idx_map

def _compute_annotations_summary(
self, confusion_matrix: np.ndarray, confusion_matrix_labels: List[str]
self, confusion_matrix: np.ndarray, confusion_matrix_labels: list[str]
) -> ComparisonReportAnnotationsSummary:
matched_ann_counts = np.diag(confusion_matrix)
ds_ann_counts = np.sum(confusion_matrix, axis=1)
Expand Down Expand Up @@ -2076,7 +2077,7 @@ def _compute_annotations_summary(
)

def _generate_frame_annotations_summary(
self, confusion_matrix: np.ndarray, confusion_matrix_labels: List[str]
self, confusion_matrix: np.ndarray, confusion_matrix_labels: list[str]
) -> ComparisonReportAnnotationsSummary:
summary = self._compute_annotations_summary(confusion_matrix, confusion_matrix_labels)

Expand All @@ -2090,8 +2091,8 @@ def _generate_frame_annotations_summary(
return summary

def _generate_dataset_annotations_summary(
self, frame_summaries: Dict[int, ComparisonReportFrameSummary]
) -> Tuple[ComparisonReportAnnotationsSummary, ComparisonReportAnnotationComponentsSummary]:
self, frame_summaries: dict[int, ComparisonReportFrameSummary]
) -> tuple[ComparisonReportAnnotationsSummary, ComparisonReportAnnotationComponentsSummary]:
# accumulate stats
annotation_components = ComparisonReportAnnotationComponentsSummary(
shape=ComparisonReportAnnotationShapeSummary(
Expand Down Expand Up @@ -2372,7 +2373,7 @@ def _compute_reports(self, task_id: int) -> int:
in active_validation_frames
)

jobs: List[Job] = [j for j in job_queryset if j.type == JobType.ANNOTATION]
jobs: list[Job] = [j for j in job_queryset if j.type == JobType.ANNOTATION]
job_data_providers = {
job.id: JobDataProvider(
job.id,
Expand All @@ -2384,7 +2385,7 @@ def _compute_reports(self, task_id: int) -> int:

quality_params = self._get_task_quality_params(task)

job_comparison_reports: Dict[int, ComparisonReport] = {}
job_comparison_reports: dict[int, ComparisonReport] = {}
for job in jobs:
job_data_provider = job_data_providers[job.id]
comparator = DatasetComparator(
Expand Down Expand Up @@ -2449,14 +2450,14 @@ def _get_current_job(self):
return get_current_job()

def _compute_task_report(
self, task: Task, job_reports: Dict[int, ComparisonReport]
self, task: Task, job_reports: dict[int, ComparisonReport]
) -> ComparisonReport:
# The task dataset can be different from any jobs' dataset because of frame overlaps
# between jobs, from which annotations are merged to get the task annotations.
# Thus, a separate report could be computed for the task. Instead, here we only
# compute the combined summary of the job reports.
task_intersection_frames = set()
task_conflicts: List[AnnotationConflict] = []
task_conflicts: list[AnnotationConflict] = []
task_annotations_summary = None
task_ann_components_summary = None
task_mean_shape_ious = []
Expand Down Expand Up @@ -2533,7 +2534,7 @@ def _compute_task_report(

return task_report_data

def _save_reports(self, *, task_report: Dict, job_reports: List[Dict]) -> models.QualityReport:
def _save_reports(self, *, task_report: dict, job_reports: list[dict]) -> models.QualityReport:
# TODO: add validation (e.g. ann id count for different types of conflicts)

db_task_report = models.QualityReport(
Expand Down

0 comments on commit 1e7ff33

Please sign in to comment.