diff --git a/nidmresults/test/check_consistency.py b/nidmresults/test/check_consistency.py index 4af8f12..b9f9f04 100644 --- a/nidmresults/test/check_consistency.py +++ b/nidmresults/test/check_consistency.py @@ -10,7 +10,7 @@ from rdflib import RDF -from nidmresults.objects.constants import * +from nidmresults.objects.constants import OWL RELPATH = os.path.dirname(os.path.abspath(__file__)) diff --git a/nidmresults/test/test_results_doc.py b/nidmresults/test/utils.py similarity index 74% rename from nidmresults/test/test_results_doc.py rename to nidmresults/test/utils.py index 801682b..a823397 100644 --- a/nidmresults/test/test_results_doc.py +++ b/nidmresults/test/utils.py @@ -18,7 +18,12 @@ from rdflib.graph import Graph from rdflib.namespace import RDF -from nidmresults.objects.constants_rdflib import * +from nidmresults.objects.constants_rdflib import ( + DCT, + NIDM_IN_COORDINATE_SPACE, + PROV, + XSD, +) from nidmresults.owl.owl_reader import OwlReader # Append parent script directory to path @@ -31,13 +36,10 @@ class TestResultDataModel: def get_readable_name(self, owl, graph, item): if isinstance(item, rdflib.term.Literal): - if item.datatype: - typeStr = graph.qname(item.datatype) - else: - typeStr = "" + typeStr = graph.qname(item.datatype) if item.datatype else "" if typeStr: - typeStr = "(" + typeStr + ")" - name = "'" + item + "'" + typeStr + typeStr = f"({typeStr})" + name = f"'{item}'{typeStr}" elif isinstance(item, rdflib.term.URIRef): # Look for label # name = graph.label(item) @@ -54,30 +56,20 @@ def get_readable_name(self, owl, graph, item): if m is not None: name += " (i.e. " + owl.get_label(item).split(":")[1] + ")" else: - name = "unsupported type: " + item + name = f"unsupported type: {item}" return name def get_alternatives(self, owl, graph, s=None, p=None, o=None): found = "" for s_in, p_in, o_in in graph.triples((s, p, o)): - if not o: - if not o_in.startswith(str(PROV)): - found += "; " + self.get_readable_name( - owl, - graph, - o_in, - ) - if not p: - if not p_in.startswith(str(PROV)): - found += "; " + self.get_readable_name(owl, graph, p_in) - if not s: - if not s_in.startswith(str(PROV)): - found += "; " + self.get_readable_name(owl, graph, s_in) - if len(found) > 200: - found = "" - else: - found = found[2:] + if not o and not o_in.startswith(str(PROV)): + found += f"; {self.get_readable_name(owl, graph, o_in)}" + if not p and not p_in.startswith(str(PROV)): + found += f"; {self.get_readable_name(owl, graph, p_in)}" + if not s and not s_in.startswith(str(PROV)): + found += f"; {self.get_readable_name(owl, graph, s_in)}" + found = "" if len(found) > 200 else found[2:] return found # FIXME: Extend tests to more than one dataset (group analysis, ...) @@ -92,7 +84,7 @@ def get_alternatives(self, owl, graph, s=None, p=None, o=None): def setUp(self, parent_gt_dir=None): self.my_execption = "" self.gt_dir = parent_gt_dir - self.ex_graphs = dict() + self.ex_graphs = {} def load_graph(self, ttl_name): if ttl_name not in self.ex_graphs: @@ -149,9 +141,9 @@ def print_results(self, res): def successful_retreive(self, res, info_str=""): """Check if the results query 'res' contains a value for each field.""" if not res.bindings: - self.my_execption = info_str + """: Empty query results""" + self.my_execption = f"""{info_str}: Empty query results""" return False - for idx, row in enumerate(res.bindings): + for row in res.bindings: for key, val in sorted(row.items()): logging.debug(f"{key}-->{val.decode()}") if not val.decode(): @@ -159,10 +151,6 @@ def successful_retreive(self, res, info_str=""): return False return True - # if not self.successful_retreive(self.spmexport.query(query), - # 'ContrastMap and ContrastStandardErrorMap'): - # raise Exception(self.my_execption) - def _replace_match(self, graph1, graph2, rdf_type): """Match classes of type 'rdf_type' across documents \ based on attributes.""" @@ -271,54 +259,53 @@ def _replace_match(self, graph1, graph2, rdf_type): match_found = False g2_matched = set() for g2_term, match_index in list(g2_match.items()): - if max(g2_match.values()) >= min_matching: - if ( - match_index == max(g2_match.values()) - ) and g2_term not in g2_matched: - # Found matching term - g2_matched.add(g2_term) - - if not g1_term == g2_term: - g2_name = graph2.qname(g2_term).split(":")[-1] - new_id = g1_term + "_" + g2_name - logging.debug( - graph1.qname(g1_term) - + " is matched to " - + graph2.qname(g2_term) - + " and replaced by " - + graph2.qname(new_id) - + " (match=" - + str(match_index) - + ")" - ) + if max(g2_match.values()) >= min_matching and ( + (match_index == max(g2_match.values())) + and g2_term not in g2_matched + ): + g2_matched.add(g2_term) + + if not g1_term == g2_term: + g2_name = graph2.qname(g2_term).split(":")[-1] + new_id = g1_term + "_" + g2_name + logging.debug( + graph1.qname(g1_term) + + " is matched to " + + graph2.qname(g2_term) + + " and replaced by " + + graph2.qname(new_id) + + " (match=" + + str(match_index) + + ")" + ) - for p, o in graph1.predicate_objects(g1_term): - graph1.remove((g1_term, p, o)) - graph1.add((new_id, p, o)) - for p, o in graph2.predicate_objects(g2_term): - graph2.remove((g2_term, p, o)) - graph2.add((new_id, p, o)) - for s, p in graph1.subject_predicates(g1_term): - graph1.remove((s, p, g1_term)) - graph1.add((s, p, new_id)) - for s, p in graph2.subject_predicates(g2_term): - graph2.remove((s, p, g2_term)) - graph2.add((s, p, new_id)) - - g2_terms.remove(g2_term) - g2_terms.add(new_id) - else: - logging.debug( - graph1.qname(g1_term) - + " is matched to " - + graph2.qname(g2_term) - + " (match=" - + str(match_index) - + ")" - ) + for p, o in graph1.predicate_objects(g1_term): + graph1.remove((g1_term, p, o)) + graph1.add((new_id, p, o)) + for p, o in graph2.predicate_objects(g2_term): + graph2.remove((g2_term, p, o)) + graph2.add((new_id, p, o)) + for s, p in graph1.subject_predicates(g1_term): + graph1.remove((s, p, g1_term)) + graph1.add((s, p, new_id)) + for s, p in graph2.subject_predicates(g2_term): + graph2.remove((s, p, g2_term)) + graph2.add((s, p, new_id)) + + g2_terms.remove(g2_term) + g2_terms.add(new_id) + else: + logging.debug( + graph1.qname(g1_term) + + " is matched to " + + graph2.qname(g2_term) + + " (match=" + + str(match_index) + + ")" + ) - match_found = True - break + match_found = True + break if not match_found: logging.debug("No match found for " + graph1.qname(g1_term)) @@ -362,9 +349,9 @@ def compare_full_graphs( gt_graph, other_graph ) - in_both, in_gt, in_other = graph_diff(gt_graph, other_graph) + _, in_gt, in_other = graph_diff(gt_graph, other_graph) - exc_missing = list() + exc_missing = [] for s, p, o in in_gt: # If there is a corresponding s,p check if @@ -388,19 +375,18 @@ def compare_full_graphs( ) ) - exc_added = list() + exc_added = [] if not include: - for s, p, o in in_other: - if p not in to_ignore: - exc_added.append( - "\nAdded :\t '%s %s %s'" - % ( - self.get_readable_name(owl, other_graph, s), - self.get_readable_name(owl, other_graph, p), - self.get_readable_name(owl, other_graph, o), - ) - ) - + exc_added.extend( + "\nAdded :\t '%s %s %s'" + % ( + self.get_readable_name(owl, other_graph, s), + self.get_readable_name(owl, other_graph, p), + self.get_readable_name(owl, other_graph, o), + ) + for s, p, o in in_other + if p not in to_ignore + ) my_exception += "".join(sorted(exc_missing) + sorted(exc_added)) if raise_now and my_exception: @@ -432,22 +418,23 @@ def _same_json_or_float(self, o, o_other): same_json_array = False numeric_types = [XSD.float, XSD.double, XSD.long, XSD.int] - if o.datatype in numeric_types: - if o_other.datatype in numeric_types: - # If both are zero but of different type isclose returns - # false - if o.value == 0 and o_other.value == 0: - close_float = True - - # Avoid None - if o.value and o_other.value: - close_float = np.isclose(o.value, o_other.value) - - if o.datatype in [XSD.string, None]: - if o_other.datatype in [XSD.string, None]: - if o.value == o_other.value: - same_str = True - + if ( + o.datatype in numeric_types + and o_other.datatype in numeric_types + ): + if o.value == 0 and o_other.value == 0: + close_float = True + + # Avoid None + if o.value and o_other.value: + close_float = np.isclose(o.value, o_other.value) + + if ( + o.datatype in [XSD.string, None] + and o_other.datatype in [XSD.string, None] + and o.value == o_other.value + ): + same_str = True return (same_json_array, close_float, same_str)