From 02738af61f2271e895f357ba64c5166f31820bf0 Mon Sep 17 00:00:00 2001 From: Ashwin Krishnan Date: Sun, 15 Sep 2024 22:03:43 +0000 Subject: [PATCH] updated graph db graphQL queries and tests --- .vscode/settings.json | 3 +- .../src/uns_graphdb/graphdb_handler.py | 4 +- 07_uns_graphql/schema/uns_schema.graphql | 108 ++++- .../src/uns_graphql/backend/graphdb.py | 25 +- .../src/uns_graphql/queries/graph.py | 456 ++++++++++++++++-- .../src/uns_graphql/queries/queries.cypher | 166 ++++--- 07_uns_graphql/test/backend/test_graphdb.py | 71 ++- 07_uns_graphql/test/queries/test_graph.py | 408 +++++++++++++++- .../test/type/test_sparkplugb_node.py | 29 +- 9 files changed, 1135 insertions(+), 135 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 76403c40..6dd14153 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -19,6 +19,7 @@ "createdb", "createrole", "createuser", + "cypher", "DBIRTH", "dbuser", "DCMD", @@ -112,7 +113,7 @@ "./07_uns_graphql/src", "./07_uns_graphql/test", ], - "python.languageServer": "Pylance", + "python.languageServer": "Default", "python.missingPackage.severity": "Error", "ruff.showNotifications": "always", "ruff.trace.server": "messages", diff --git a/03_uns_graphdb/src/uns_graphdb/graphdb_handler.py b/03_uns_graphdb/src/uns_graphdb/graphdb_handler.py index aa23ef31..3d22f2b1 100644 --- a/03_uns_graphdb/src/uns_graphdb/graphdb_handler.py +++ b/03_uns_graphdb/src/uns_graphdb/graphdb_handler.py @@ -103,8 +103,8 @@ def connect(self, retry: int = 0) -> neo4j.Driver: ---------- retry: int Optional parameters to retry making a connection in case of errors. - The max number of retry is `GraphDBHandler.MAX_RETRIES` - The time between attempts is `GraphDBHandler.SLEEP_BTW_ATTEMPT` + The max number of retry is `GraphDBHandler.max_retry` + The time between attempts is `GraphDBHandler.sleep_btw_attempts` Returns: neo4j.Driver: The Neo4j driver object. diff --git a/07_uns_graphql/schema/uns_schema.graphql b/07_uns_graphql/schema/uns_schema.graphql index 59364ccf..198640bd 100644 --- a/07_uns_graphql/schema/uns_schema.graphql +++ b/07_uns_graphql/schema/uns_schema.graphql @@ -77,15 +77,102 @@ type Query { """ getUnsNodes(topics: [MQTTTopicInput!]!): [UNSNode!]! - """ - Get all UNSNodes published which have specific attributes.Option binary_operator input allows chaining the list of property_keys. If NULL, property_keys will be ORed- OR: Either of the property_keys must exist in the same node. If only one property_keys provided will be ignored- AND: All property_keys must exist in same node. If only one property_keys provided will be ignored- NOT: None of the provided property_keys should exist in the same nodeOther criteria - topics will always be ANDed to the query filter - """ - getUnsNodesByProperty(propertyKeys: [String!]!, binaryOperator: BinaryOperator, topics: [MQTTTopicInput!]): [UNSNode!]! + "Get all UNSNodes published which have specific attribute name as 'property_keys'. \n Optionally Filter results with list of topics. \nIf topics are provided then optional boolean value exclude_topics attribute can be set to true .\nto exclude the nodes which match the topics.\n" + getUnsNodesByProperty(propertyKeys: [String!]!, topics: [MQTTTopicInput!], excludeTopics: Boolean = false): [UNSNode!]! - """ - Get consolidation of sparkplugB nodes. MQTT wildcards are supportedEnsure that the topics are all in the SparkplugB namespace i.e. starts with 'spBv1.0/' Format of an SpB topic is spBv1.0/ / / / [] - """ - getSpbNodes(topics: [MQTTTopicInput!]!): [UNSNode!]! + """Get all the SPBNode by the provided metric name""" + getSpbNodesByMetric(metricNames: [String!]!): [SPBNode!]! +} + +type SPBDataSet { + numOfColumns: Union! + columns: [String!]! + types: [String!]! + rows: [SPBDataSetRow!]! +} + +type SPBDataSetRow { + elements: [SPBDataSetValue!]! +} + +type SPBDataSetValue { + value: SPBPrimitive! +} + +type SPBMetadata { + isMultiPart: Boolean + contentType: String + size: Union + seq: Union + fileName: String + fileType: String + md5: String + description: String +} + +type SPBMetric { + name: String! + alias: Union + timestamp: DateTime! + datatype: String! + isHistorical: Boolean + isTransient: Boolean + isNull: Boolean + metadata: SPBMetadata + properties: SPBPropertySet + value: SPBPrimitiveBytesPayloadSPBDataSetSPBTemplate! +} + +type SPBNode { + topic: String! + timestamp: DateTime! + metrics: [SPBMetric!]! + seq: Union! + uuid: ID + body: Base64 +} + +""" +Wrapper for primitive types in Sparkplug.: int, float, str, bool, list + Needed because GraphQL does not support str for unions. + Data is converted to its String representation for convenience. + Use the datatype to convert to actual type if needed +""" +type SPBPrimitive { + data: String! +} + +union SPBPrimitiveBytesPayloadSPBDataSetSPBTemplate = SPBPrimitive | BytesPayload | SPBDataSet | SPBTemplate + +union SPBPrimitiveSPBPropertySetSPBPropertySetList = SPBPrimitive | SPBPropertySet | SPBPropertySetList + +type SPBPropertySet { + keys: [String!]! + values: [SPBPropertyValue!]! +} + +type SPBPropertySetList { + propertysets: [SPBPropertySet!]! +} + +type SPBPropertyValue { + isNull: Boolean + datatype: String! + value: SPBPrimitiveSPBPropertySetSPBPropertySetList! +} + +type SPBTemplate { + version: String + metrics: [SPBMetric!]! + parameters: [SPBTemplateParameter!] + templateRef: String + isDefinition: Boolean +} + +type SPBTemplateParameter { + name: String! + datatype: String! + value: SPBPrimitive! } type StreamingMessage { @@ -112,4 +199,7 @@ type UNSNode { payload: JSONPayload! created: DateTime! lastUpdated: DateTime! -} \ No newline at end of file +} + +"""Int 64 field since GraphQL doesn't support int64, only int 32""" +scalar Union \ No newline at end of file diff --git a/07_uns_graphql/src/uns_graphql/backend/graphdb.py b/07_uns_graphql/src/uns_graphql/backend/graphdb.py index 1a8da3ca..6ea9f0e0 100644 --- a/07_uns_graphql/src/uns_graphql/backend/graphdb.py +++ b/07_uns_graphql/src/uns_graphql/backend/graphdb.py @@ -18,6 +18,7 @@ Encapsulates integration with the Graph database database """ +import asyncio import logging from neo4j import AsyncDriver, AsyncGraphDatabase, AsyncResult, Record @@ -26,6 +27,9 @@ LOGGER = logging.getLogger(__name__) +MAX_RETRIES = 5 +SLEEP_BTW_ATTEMPT = 10 + class GraphDB: """ @@ -37,17 +41,24 @@ class GraphDB: _graphdb_driver: AsyncDriver = None @classmethod - async def get_graphdb_driver(cls) -> AsyncDriver: + async def get_graphdb_driver(cls, retry: int = 0) -> AsyncDriver: """ Returns the Neo4j async driver which is the connection to the database. Validates if the current driver is still connected, and if not, creates a new connection. The driver is cached and reused for subsequent requests to avoid creating multiple connections. + Parameters + ---------- + retry: int + Optional parameters to retry making a connection in case of errors. + The max number of retry is `MAX_RETRIES` + The time between attempts is `SLEEP_BTW_ATTEMPT` seconds Returns: AsyncDriver: The Neo4j async driver. """ LOGGER.debug("GraphDB driver requested") + current_loop = asyncio.get_event_loop() if cls._graphdb_driver is None: LOGGER.info("Creating a new GraphDB driver") cls._graphdb_driver = AsyncGraphDatabase.driver( @@ -60,11 +71,13 @@ async def get_graphdb_driver(cls) -> AsyncDriver: LOGGER.error("Failed to verify GraphDB driver connectivity: %s", str(ex), stack_info=True, exc_info=True) # In case of connectivity failure, close the existing driver and create a new one await cls.release_graphdb_driver() - cls._graphdb_driver = AsyncGraphDatabase.driver( - uri=GraphDBConfig.conn_url, auth=(GraphDBConfig.user, GraphDBConfig.password) - ) - await cls._graphdb_driver.verify_connectivity() - LOGGER.info("Reconnected to GraphDB driver") + if retry >= MAX_RETRIES: + LOGGER.error("No. of retries exceeded %s", str(MAX_RETRIES), stack_info=True, exc_info=True) + raise ex # Re-raise the exception after cleanup + + await asyncio.sleep(SLEEP_BTW_ATTEMPT) + cls._graphdb_driver = await cls.get_graphdb_driver(retry + 1) + LOGGER.info("Trying to Reconnect to GraphDB driver") return cls._graphdb_driver diff --git a/07_uns_graphql/src/uns_graphql/queries/graph.py b/07_uns_graphql/src/uns_graphql/queries/graph.py index c3f1ca63..c730f27c 100644 --- a/07_uns_graphql/src/uns_graphql/queries/graph.py +++ b/07_uns_graphql/src/uns_graphql/queries/graph.py @@ -19,17 +19,32 @@ """ import logging -from typing import Optional +from datetime import UTC, datetime +from typing import Any, Optional import strawberry +from neo4j import Record +from neo4j.graph import Node, Relationship +from uns_mqtt.mqtt_listener import UnsMQTTClient from uns_graphql.backend.graphdb import GraphDB +from uns_graphql.graphql_config import GraphDBConfig from uns_graphql.input.mqtt import MQTTTopicInput -from uns_graphql.type.basetype import BinaryOperator from uns_graphql.type.isa95_node import UNSNode +from uns_graphql.type.sparkplugb_node import SPBNode LOGGER = logging.getLogger(__name__) +# Constants used for node and relationship key properties +NODE_NAME_KEY = "node_name" +CREATED_TIMESTAMP_KEY = "_created_timestamp" +MODIFIED_TIMESTAMP_KEY = "_modified_timestamp" + +NODE_RELATION_NAME = "PARENT_OF" +REL_ATTR_KEY = "attribute_name" +REL_ATTR_TYPE = "type" +REL_INDEX = "index" + @strawberry.type(description="Query GraphDB for current consolidated UNS Nodes created by merging multiple UNS Events ") class Query: @@ -37,54 +52,340 @@ class Query: All Queries for latest consolidated node from the Unified Namespace """ + # Label filters to be used in the queries + UNS_LABEL_FILTER = "|".join(GraphDBConfig.uns_node_types) + SPB_LABEL_FILTER = "|".join(GraphDBConfig.spb_node_types) + + # + """ + Template for the query to search by property. following variables used in the query + {0} - label filter to determine if the query is for UNS or SPB i.e. UNS_LABEL_FILTER or SPB_LABEL_FILTER + {1} - label filter for NESTED_ATTRIBUTE i.e GraphDBConfig.nested_attribute_node_type + {2} - optional subquery for including or excluding nodes based on topics. Pass empty string if no topics are present + + While running the query the propertyNames array and the topicFilter array values are passed in as parameters. + Ensure that topicFilter is an array of regex expressions. Convert the topic wildcards to regex. + + The result of the query is a list of + - fullName: topic or path till the node, + - resultNode: the node matching the query, + - nestedChildren: all the nested children of the node of type GraphDBConfig.nested_attribute_node_type + - relationships: between resultNode and nestedChildren & between nestedChildren if more than 1 level of nesting + """ + _SEARCH_BY_PROPERTY_QUERY = """WITH $propertyNames AS propertyNames + UNWIND propertyNames AS propertyName + // Step 1: Find all nodes containing the specified property + // Use a sub query to handle both MATCH conditions + CALL (propertyName) {{ + // Match nodes that directly contain the specified property + MATCH (simple_node:{0}) // dynamically add the label filter here + WHERE simple_node[propertyName] IS NOT NULL + RETURN DISTINCT simple_node AS resultNode + UNION + // Match nodes that are related via a specific relationship property + MATCH (nested_node:{0})-[r:PARENT_OF {{attribute_name: propertyName}}]->(:{1}) + WHERE r.type IN ["list", "dict"] + RETURN DISTINCT nested_node AS resultNode + }} + + // Step 2: Use APOC to find the path from each node to the root, excluding '{1}' nodes + CALL apoc.path.subgraphNodes(resultNode, {{ + relationshipFilter: 'PARENT_OF<', + labelFilter: '-{1}', + maxLevel: -1 + }}) YIELD node AS pathNode + + // Step 3: Collect the nodes along the path and construct the full name + WITH resultNode, + COLLECT(pathNode) AS pathNodes + WITH resultNode, + REDUCE(fullPath = '', n IN pathNodes | + CASE + WHEN fullPath = '' THEN n.node_name + ELSE n.node_name + '/' + fullPath + END) AS fullName + + {2} + + // Step 5: Find nested children with label "{1}" and their relationships + + OPTIONAL MATCH (resultNode)-[r:PARENT_OF]->(nestedChild:{1}) + OPTIONAL MATCH (nestedChild)-[nestedRel:PARENT_OF*]->(child:{1}) + + // Step 6: Return the full path, resultNode, nested children, and relationships + RETURN DISTINCT + fullName, + resultNode, + COLLECT(DISTINCT nestedChild) AS nestedChildren, + COLLECT(DISTINCT r) + COLLECT(DISTINCT nestedRel) AS relationships + """ + + # sub query for _SEARCH_BY_PROPERTY_QUERY template + _FILTER_BY_TOPIC_INCLUSION_QUERY = """// Step 4: Apply the topic filter (array of regex expressions) to include + WITH resultNode, fullName, $topicFilter AS topicFilter + WHERE ANY(regex IN topicFilter WHERE fullName =~ regex) // If Topics are to be matched + """ + # sub query for _SEARCH_BY_PROPERTY_QUERY template + _FILTER_BY_TOPIC_EXCLUSION_QUERY = """// Step 4: Apply the topic filter (array of regex expressions) to exclude + WITH resultNode, fullName, $topicFilter AS topicFilter + WHERE NONE(regex IN topicFilter WHERE fullName =~ regex) // If Topics are to be matched + """ + # + """ + Search by topic query. Parameters used in the query are + - topics - array of topics from client ( no need to convert wildcards to regex, the query will handle it ) + - labels - array of labels to match for the nodes i.e. UNS_LABEL_FILTER or SPB_LABEL_FILTER + The result of the query is a list of + - fullName: topic or path till the node, + - resultNode: the node matching the query, + - nestedChildren: all the nested children of the node of type GraphDBConfig.nested_attribute_node_type + - relationships: between resultNode and nestedChildren & between nestedChildren if more than 1 level of nesting + """ + _SEARCH_BY_TOPIC_QUERY = f"""WITH $topics AS inputPaths + UNWIND inputPaths AS inputPath + WITH split(inputPath, '/') AS nodeNames, inputPath + WITH $labels AS labels, nodeNames, range(0, size(nodeNames) - 1) AS idxRange + + // Step 1: Construct each part of the query dynamically + WITH nodeNames, labels, idxRange, + [idx IN idxRange | + CASE + // Handle the "#" wildcard + WHEN nodeNames[idx] = "#" THEN + CASE + WHEN idx = 0 THEN 'MATCH (N' + toString(idx) + ':' + labels + ')' + ELSE 'MATCH (N' + toString(idx-1)+')-[:{NODE_RELATION_NAME}*]->(N' + toString(idx) + ':' + labels + ')' + END + // Handle the "+" wildcard + WHEN nodeNames[idx] = "+" THEN + CASE + WHEN idx = 0 THEN 'MATCH (N' + toString(idx) + ':' + labels + ') WHERE NOT ()-[:{NODE_RELATION_NAME}]->(N' + toString(idx) + ')' + ELSE 'MATCH (N' + toString(idx-1)+')-[:{NODE_RELATION_NAME}]->(N' + toString(idx) + ':' + labels + ')' + END + // Handle exact node names + ELSE + CASE + WHEN idx = 0 THEN 'MATCH (N' + toString(idx) + ':' + labels + ' {{node_name: "' + nodeNames[idx] + '"}})' + ELSE 'MATCH (N' + toString(idx-1)+')-[:{NODE_RELATION_NAME}]->(N' + toString(idx) + ':' + labels + ' {{node_name: "' + nodeNames[idx] + '"}})' + END + END + ] AS queryParts + + // Step 2: Join the query parts into a full Cypher query + WITH apoc.text.join(queryParts, '') + ' RETURN N' + toString(size(nodeNames) - 1) + ' AS resultNode' AS finalQuery + + // Step 3: Execute the dynamically constructed query + CALL apoc.cypher.run(finalQuery, {{}}) YIELD value + + WITH DISTINCT value.resultNode as resultNode + // Step 4: Use APOC to find the path from each node to the root, excluding '{ GraphDBConfig.nested_attribute_node_type }' nodes + CALL apoc.path.subgraphNodes(resultNode, {{ + relationshipFilter: '{ NODE_RELATION_NAME }<', + labelFilter: '-{ GraphDBConfig.nested_attribute_node_type }', + maxLevel: -1 + }}) YIELD node AS pathNode + + // Step 5: Collect the nodes along the path and construct the full name + WITH resultNode, + COLLECT(pathNode) AS pathNodes + WITH resultNode, + REDUCE(fullPath = '', n IN pathNodes | + CASE + WHEN fullPath = '' THEN n.node_name + ELSE n.node_name + '/' + fullPath + END) AS fullName + // Step 6: Find nested children with label "{ GraphDBConfig.nested_attribute_node_type }" and their relationships + OPTIONAL MATCH (resultNode)-[r:{NODE_RELATION_NAME}]->(nestedChild:{ GraphDBConfig.nested_attribute_node_type }) + OPTIONAL MATCH (nestedChild)-[nestedRel:{NODE_RELATION_NAME}*]->(child:{ GraphDBConfig.nested_attribute_node_type }) + + // Step 7: Return the full path, resultNode, nested children, and relationships + RETURN DISTINCT + fullName, + resultNode, + COLLECT(DISTINCT nestedChild) AS nestedChildren, + COLLECT(DISTINCT r) + COLLECT(DISTINCT nestedRel) AS relationships + """ # noqa: E501 + # + """ + Search SparkplugB namespace to get all payloads containing the provided metric name + Parameters used in the query are + - metic_names : array of metric names. case sensitive and exact match + + The result of the query is a list of + - fullName: topic or path till the node, + - resultNode: the node matching the query, + - nestedChildren: all the nested children of the node of type GraphDBConfig.nested_attribute_node_type + - relationships: between resultNode and nestedChildren & between nestedChildren if more than 1 level of nesting + """ + _SEARCH_SPB_BY_METRIC_QUERY = f"""WITH $metric_names as metric_names + UNWIND metric_names as metric_name + MATCH (resultNode:{{{SPB_LABEL_FILTER}}})-[rel:{NODE_RELATION_NAME}*{{attribute_name:"metrics"}}]-> + (:{ GraphDBConfig.nested_attribute_node_type }{{node_name:metric_name}}) + + // Step 2: Use APOC to find the path from each node to the root, + // excluding '{GraphDBConfig.nested_attribute_node_type}' nodes + CALL apoc.path.subgraphNodes(resultNode, {{ + relationshipFilter: '{NODE_RELATION_NAME}<', + labelFilter: '-{ GraphDBConfig.nested_attribute_node_type }', + maxLevel: -1 + }}) YIELD node AS pathNode + + // Step 3: Collect the nodes along the path and construct the full name + WITH resultNode, + COLLECT(pathNode) AS pathNodes + WITH resultNode, + REDUCE(fullPath = '', n IN pathNodes | + CASE + WHEN fullPath = '' THEN n.node_name + ELSE n.node_name + '/' + fullPath + END) AS fullName + + // Step 4: Find nested children with label "{ GraphDBConfig.nested_attribute_node_type }" and their relationships + + OPTIONAL MATCH (resultNode)-[r:{NODE_RELATION_NAME}]->(nestedChild:{ GraphDBConfig.nested_attribute_node_type }) + OPTIONAL MATCH (nestedChild)-[nestedRel:{NODE_RELATION_NAME}*]->(child:{ GraphDBConfig.nested_attribute_node_type }) + + // Step 5: Return the full path, resultNode, nested children, and relationships + RETURN DISTINCT + fullName, + resultNode, + COLLECT(DISTINCT nestedChild) AS nestedChildren, + COLLECT(DISTINCT r) + COLLECT(DISTINCT nestedRel) AS relationships + """ + @strawberry.field(description="Get consolidation of nodes for given array of topics. MQTT wildcards are supported") - def get_uns_nodes( + async def get_uns_nodes( self, - topics: list[MQTTTopicInput], + mqtt_topics: list[MQTTTopicInput], ) -> list[UNSNode]: - LOGGER.debug("Query for Nodes in UNS with Params :\n" f"topics={topics}") - if type(topics) is not list: - # convert single topic to array for consistent handling - topics = [topics] - # TBD + LOGGER.debug("Query for Nodes in UNS with Params :\n" f"topics={mqtt_topics}") + if type(mqtt_topics) is not list: + # convert single topic to array for consistent handling. Done need to convert to regex + mqtt_topics = [mqtt_topics] + # Initialize the GraphDB + graph_db = GraphDB() + results: list[Record] = await graph_db.execute_query( + query=self._SEARCH_BY_TOPIC_QUERY, + topics=[mqtt_topic.topic for mqtt_topic in mqtt_topics], + labels=self.UNS_LABEL_FILTER, + ) + uns_node_list: list[UNSNode] = [] + for record in results: + topic: str = record["fullName"] + node: Node = record["resultNode"] + child_nodes: list[Node] = record["nestedChildren"] + relationships: list[Relationship] = record["relationships"] + + if node[MODIFIED_TIMESTAMP_KEY]: + modified_timestamp = datetime.fromtimestamp(node[MODIFIED_TIMESTAMP_KEY] / 1000, UTC) + else: + # if the DB doesn't have any value, then created and modified timestamps are the same + modified_timestamp = datetime.fromtimestamp(node[CREATED_TIMESTAMP_KEY] / 1000, UTC) + + uns_node: UNSNode = UNSNode( + node_name=node[NODE_NAME_KEY], + # As the node can have multiple labels extract only those which are of UNS Node types + node_type=self.get_node_type(list(node.labels), GraphDBConfig.uns_node_types), + namespace=topic, + payload=self.get_nested_properties(node, child_nodes, relationships), + created=datetime.fromtimestamp(node[CREATED_TIMESTAMP_KEY] / 1000, UTC), + last_updated=modified_timestamp, + ) + uns_node_list.append(uns_node) + return uns_node_list @strawberry.field( - description="Get all UNSNodes published which have specific attributes." - "Option binary_operator input allows chaining the list of property_keys. If NULL, property_keys will be ORed" - "- OR: Either of the property_keys must exist in the same node. If only one property_keys provided will be ignored" - "- AND: All property_keys must exist in same node. If only one property_keys provided will be ignored" - "- NOT: None of the provided property_keys should exist in the same node" - "Other criteria - topics will always be ANDed to the query filter" + description="Get all UNSNodes published which have specific attribute name as 'property_keys'. \n " + "Optionally Filter results with list of topics. \n" + "If topics are provided then optional boolean value exclude_topics attribute can be set to true .\n" + "to exclude the nodes which match the topics.\n" ) - def get_uns_nodes_by_property( + async def get_uns_nodes_by_property( self, property_keys: list[str], - binary_operator: Optional[BinaryOperator] = strawberry.UNSET, topics: Optional[list[MQTTTopicInput]] = strawberry.UNSET, + exclude_topics: Optional[bool] = False, ) -> list[UNSNode]: LOGGER.debug( "Query for historic events by properties, with params :\n" - f"property_keys={property_keys}, binary_operator={binary_operator}, topics={topics}," + f"property_keys={property_keys}, topics={topics}, exclude_topics={exclude_topics}" ) - if type(topics) is not list: + if topics is None or topics == strawberry.UNSET: + topics = [] + elif type(topics) is not list: # convert single topic to array for consistent handling topics = [topics] - # TBD - @strawberry.field( - description="Get consolidation of sparkplugB nodes. MQTT wildcards are supported" - "Ensure that the topics are all in the SparkplugB namespace i.e. starts with 'spBv1.0/' " - "Format of an SpB topic is spBv1.0/ / / / []" - ) - def get_spb_nodes( - self, - topics: list[MQTTTopicInput], - ) -> list[UNSNode]: - LOGGER.debug("Query for Nodes in SpB with Params :\n" f"topics={topics}") - if type(topics) is not list: - # convert single topic to array for consistent handling - topics = [topics] - # TBD + if type(property_keys) is not list: + property_keys = [property_keys] + + topic_regex_list: list[str] = [UnsMQTTClient.get_regex_for_topic_with_wildcard(topic.topic) for topic in topics] + + topic_sub_query = None + if len(topic_regex_list) == 0: + topic_sub_query = "" + elif exclude_topics: + topic_sub_query = Query._FILTER_BY_TOPIC_EXCLUSION_QUERY + else: + topic_sub_query = Query._FILTER_BY_TOPIC_INCLUSION_QUERY + + final_query = Query._SEARCH_BY_PROPERTY_QUERY.format( + Query.UNS_LABEL_FILTER, GraphDBConfig.nested_attribute_node_type, topic_sub_query + ) + # Initialize the GraphDB + graph_db = GraphDB() + results: list[Record] = await graph_db.execute_query( + query=final_query, propertyNames=property_keys, topicFilter=topic_regex_list + ) + uns_node_list: list[UNSNode] = [] + for record in results: + topic: str = record["fullName"] + node: Node = record["resultNode"] + child_nodes: list[Node] = record["nestedChildren"] + relationships: list[Relationship] = record["relationships"] + + if node[MODIFIED_TIMESTAMP_KEY]: + modified_timestamp = datetime.fromtimestamp(node[MODIFIED_TIMESTAMP_KEY] / 1000, UTC) + else: + # if the DB doesn't have any value, then created and modified timestamps are the same + modified_timestamp = datetime.fromtimestamp(node[CREATED_TIMESTAMP_KEY] / 1000, UTC) + + uns_node: UNSNode = UNSNode( + node_name=node[NODE_NAME_KEY], + # As the node can have multiple labels extract only those which are of UNS Node types + node_type=self.get_node_type(list(node.labels), GraphDBConfig.uns_node_types), + namespace=topic, + payload=self.get_nested_properties(node, child_nodes, relationships), + created=datetime.fromtimestamp(node[CREATED_TIMESTAMP_KEY] / 1000, UTC), + last_updated=modified_timestamp, + ) + uns_node_list.append(uns_node) + return uns_node_list + + @strawberry.field(description="Get all the SPBNode by the provided metric name") + async def get_spb_nodes_by_metric(self, metric_names: list[str]) -> list[SPBNode]: + """ """ + LOGGER.debug("Query for Nodes in SpB with Params :\n" f"topics={metric_names}") + if type(metric_names) is not list: + # convert single topic to array for consistent handling. Done need to convert to regex + metric_names = [metric_names] + # Initialize the GraphDB + graph_db = GraphDB() + results: list[Record] = await graph_db.execute_query(query=self._SEARCH_SPB_BY_METRIC_QUERY, metric_names=metric_names) + spb_metric_nodes: list[SPBNode] = [] + for record in results: + topic: str = record["fullName"] + node: Node = record["resultNode"] + child_nodes: list[Node] = record["nestedChildren"] + relationships: list[Relationship] = record["relationships"] + + spb_metric = SPBNode( + topic=topic, + payload=self.get_nested_properties(node, child_nodes, relationships), + ) + spb_metric_nodes.append(spb_metric) + + return spb_metric_nodes @classmethod async def on_shutdown(cls): @@ -92,3 +393,90 @@ async def on_shutdown(cls): Clean up Db connection pool """ await GraphDB.release_graphdb_driver() + + @classmethod + def get_node_type(cls, labels: list[str], valid_labels: tuple[str]) -> str: + """ + Compares teh labels on a node with the valid set of label types and returns the one which matches + this allows the nodes to have multiple labels in the future + """ + for label in labels: + if label in valid_labels: + return label + + @classmethod + def get_nested_properties(cls, parent: Node, nested_children: list[Node], relationships: list[Relationship]) -> dict: # noqa: C901 + """ + Retrieves nested properties for a given parent node by merging the nested chid + + Args: + parent: The parent node. + nested_children (list): A list of child node types + relationships (list): A list of relationships to traverse. + + Returns: + dict : All child nodes converted into child dicts. + The returned dict can be merged into the properties of parent node + """ + if parent is None: + raise ValueError("parent cannot be null") + if nested_children is None: + nested_children = [] + if relationships is None: + relationships = [] + # map element id uniquely identifying a node to the corresponding properties map + node_to_prop: dict[str, dict] = {} + + for node in [parent, *nested_children]: + # loop through all the nodes to create the corresponding dict objects + nested_properties: dict[str, Any] = { + # filter out special properties added to the neo4j node which were not present in the original payload + key: value + for key, value in node.items() + if key not in [NODE_NAME_KEY, CREATED_TIMESTAMP_KEY, MODIFIED_TIMESTAMP_KEY] + } + + # logic for handling changing key NODE_NAME to lower case which was done while persisting + if nested_properties.get(NODE_NAME_KEY.upper()): + nested_properties[NODE_NAME_KEY] = nested_properties.pop(NODE_NAME_KEY.upper()) + + # save those objects against the element id for later retrieval + node_to_prop[node.element_id] = nested_properties + + # find max index for nested lists ( key (parent_id, attribute_name)) + map_list_size: dict[tuple, int] = {} + for relation in relationships: + if relation.type != NODE_RELATION_NAME or relation[REL_ATTR_TYPE] != "list": + continue # ensure that only relevant relations are processed + + attr_name = relation[REL_ATTR_KEY] + size: int = map_list_size.get((relation.nodes[0].element_id, attr_name), 0) + index = int(relation[REL_INDEX]) + if size - 1 < index: + map_list_size[relation.nodes[0].element_id, attr_name] = index + 1 + + for relation in relationships: + # in a relationship relation.nodes[0] is the parent and relation.nodes[1] is the child + # get the the node_properties based on the element id + if relation.type != NODE_RELATION_NAME: + continue # ensure that only relevant relations are processed + parent_property_map: dict = node_to_prop[relation.nodes[0].element_id] + child_property_map: dict = node_to_prop[relation.nodes[1].element_id] + attr_name: str = relation[REL_ATTR_KEY] + attr_type: str = relation[REL_ATTR_TYPE] # will be either dict or list + + match attr_type: + case "dict": + # get the properties of the child and merge it into parent_property_map + parent_property_map[attr_name] = child_property_map + + case "list": + index = int(relation[REL_INDEX]) # will be null for dict, list index for list + dict_size = map_list_size.get((relation.nodes[0].element_id, attr_name)) + child_list: list = parent_property_map.get(attr_name, [None] * dict_size) + + # update the list map at correct index + child_list[index] = child_property_map + parent_property_map[attr_name] = child_list + + return node_to_prop[parent.element_id] diff --git a/07_uns_graphql/src/uns_graphql/queries/queries.cypher b/07_uns_graphql/src/uns_graphql/queries/queries.cypher index 14c72ad4..d852d4e6 100644 --- a/07_uns_graphql/src/uns_graphql/queries/queries.cypher +++ b/07_uns_graphql/src/uns_graphql/queries/queries.cypher @@ -1,17 +1,18 @@ +// spBv1_0|GROUP|MESSAGE_TYPE|EDGE_NODE|DEVICE <= add as label filter to simple_node and nested_node +// ENTERPRISE|FACILITY|AREA|LINE|DEVICE<= add as label filter to simple_node and nested_node + +WITH $propertyNames AS propertyNames +UNWIND propertyNames AS propertyName // Step 1: Find all nodes containing the specified property -// Use a subquery to handle both MATCH conditions -CALL { - WITH $propertyNames AS propertyNames - UNWIND propertyNames AS propertyName +// Use a sub query to handle both MATCH conditions +CALL (propertyName) { // Match nodes that directly contain the specified property - MATCH (simple_node) + MATCH (simple_node) // dynamically add the label filter here WHERE simple_node[propertyName] IS NOT NULL RETURN DISTINCT simple_node AS resultNode UNION - WITH $propertyNames AS propertyNames - UNWIND propertyNames AS propertyName // Match nodes that are related via a specific relationship property - MATCH (nested_node)-[r:PARENT_OF {attribute_name: propertyName}]->(:NESTED_ATTRIBUTE) + MATCH (nested_node)-[r:PARENT_OF {attribute_name: propertyName}]->(:NESTED_ATTRIBUTE) // dynamically add the label filter here WHERE r.type IN ["list", "dict"] RETURN DISTINCT nested_node AS resultNode } @@ -32,73 +33,124 @@ WITH resultNode, WHEN fullPath = '' THEN n.node_name ELSE n.node_name + '/' + fullPath END) AS fullName -// Step 4: Find nested children with label "NESTED_ATTRIBUTE" and their relationships +// Step 4: Apply the topic filter (array of regex expressions) +WITH resultNode, fullName, $topicFilter AS topicFilter +WHERE ANY(regex IN topicFilter WHERE fullName =~ regex) // If Topics are to be matched +//WHERE NONE(regex IN topicFilter WHERE fullName =~ regex) // <= If Topics are to be excluded +// Step 5: Find nested children with label "NESTED_ATTRIBUTE" and their relationships + OPTIONAL MATCH (resultNode)-[r:PARENT_OF]->(nestedChild:NESTED_ATTRIBUTE) OPTIONAL MATCH (nestedChild)-[nestedRel:PARENT_OF*]->(child:NESTED_ATTRIBUTE) -// Step 5: Return the full path, resultNode, nested children, and relationships +// Step 6: Return the full path, resultNode, nested children, and relationships RETURN DISTINCT fullName, resultNode, COLLECT(DISTINCT nestedChild) AS nestedChildren, COLLECT(DISTINCT r) + COLLECT(DISTINCT nestedRel) AS relationships ------------------ ----------------- -// Get node based on topics. Does not work for # and + entries. need atleast one + +// query for getting the node based on the topics. supports wildcards WITH $topics AS inputPaths UNWIND inputPaths AS inputPath WITH split(inputPath, '/') AS nodeNames, inputPath +WITH $labels AS labels, nodeNames, range(0, size(nodeNames) - 1) AS idxRange + +// Step 1: Construct each part of the query dynamically +WITH nodeNames, labels, idxRange, + [idx IN idxRange | + CASE + // Handle the "#" wildcard + WHEN nodeNames[idx] = "#" THEN + CASE + WHEN idx = 0 THEN 'MATCH (N' + toString(idx) + ':' + labels + ')' + ELSE 'MATCH (N' + toString(idx-1)+')-[:PARENT_OF*]->(N' + toString(idx) + ':' + labels + ')' + END + // Handle the "+" wildcard + WHEN nodeNames[idx] = "+" THEN + CASE + WHEN idx = 0 THEN 'MATCH (N' + toString(idx) + ':' + labels + ') WHERE NOT ()-[:PARENT_OF]->(N' + toString(idx) + ')' + ELSE 'MATCH (N' + toString(idx-1)+')-[:PARENT_OF]->(N' + toString(idx) + ':' + labels + ')' + END + // Handle exact node names + ELSE + CASE + WHEN idx = 0 THEN 'MATCH (N' + toString(idx) + ':' + labels + ' {node_name: "' + nodeNames[idx] + '"})' + ELSE 'MATCH (N' + toString(idx-1)+')-[:PARENT_OF]->(N' + toString(idx) + ':' + labels + ' {node_name: "' + nodeNames[idx] + '"})' + END + END + ] AS queryParts + -// Step 1: Match the root node based on the first part of the path -MATCH (root {node_name: nodeNames[0]}) - -// Step 2: Expand paths from the root node -CALL apoc.path.expand( - root, - 'PARENT_OF>', - '', // No label filter - 0, // Min depth 0 to include the root node - -1 // Max depth is unlimited to cover all possible depths -) YIELD path AS fullPath - -// Step 3: Filter paths according to the inputPath with wildcards -WITH fullPath, nodeNames, nodes(fullPath) AS nodesPath -WITH fullPath, nodeNames, nodesPath, last(nodes(fullPath)) AS lastNode -WHERE - size(nodesPath) >= size(nodeNames) AND - all(i IN range(0, size(nodeNames) - 1) WHERE - (nodeNames[i] = '+' AND size(nodesPath) > i) OR - (nodeNames[i] = '#' AND size(nodesPath) > i) OR - (nodeNames[i] = nodesPath[i].node_name) - ) AND - (size(nodeNames) = size(nodesPath) OR nodeNames[-1] = '#' OR nodeNames[-1] = lastNode.node_name) - -// Step 4: Construct the MQTT topic path from the matched path -WITH fullPath, lastNode, reduce(fullPathStr = '', n IN nodesPath | + +// Step 2: Join the query parts into a full Cypher query +WITH apoc.text.join(queryParts, '') + ' RETURN N' + toString(size(nodeNames) - 1) + ' AS resultNode' AS finalQuery + +// Step 3: Execute the dynamically constructed query +CALL apoc.cypher.run(finalQuery, {}) YIELD value + +WITH DISTINCT value.resultNode as resultNode +// Step 4: Use APOC to find the path from each node to the root, excluding 'NESTED_ATTRIBUTE' nodes +CALL apoc.path.subgraphNodes(resultNode, { + relationshipFilter: 'PARENT_OF<', + labelFilter: '-NESTED_ATTRIBUTE', + maxLevel: -1 +}) YIELD node AS pathNode + +// Step 5: Collect the nodes along the path and construct the full name +WITH resultNode, + COLLECT(pathNode) AS pathNodes +WITH resultNode, + REDUCE(fullPath = '', n IN pathNodes | CASE - WHEN fullPathStr = '' THEN n.node_name - ELSE fullPathStr + '/' + n.node_name - END) AS mqttTopic + WHEN fullPath = '' THEN n.node_name + ELSE n.node_name + '/' + fullPath + END) AS fullName +// Step 6: Find nested children with label "NESTED_ATTRIBUTE" and their relationships +OPTIONAL MATCH (resultNode)-[r:PARENT_OF]->(nestedChild:NESTED_ATTRIBUTE) +OPTIONAL MATCH (nestedChild)-[nestedRel:PARENT_OF*]->(child:NESTED_ATTRIBUTE) -// Step 5: Exclude paths containing 'NESTED_ATTRIBUTE' nodes in the path -WHERE NOT any(n IN nodesPath WHERE 'NESTED_ATTRIBUTE' IN labels(n)) +// Step 7: Return the full path, resultNode, nested children, and relationships +RETURN DISTINCT + fullName, + resultNode, + COLLECT(DISTINCT nestedChild) AS nestedChildren, + COLLECT(DISTINCT r) + COLLECT(DISTINCT nestedRel) AS relationships + + + ============ + //Query to search SPB Node by Metrics +WITH $metric_names as metric_names +UNWIND metric_names as metric_name +MATCH (resultNode)-[rel:PARENT_OF*{attribute_name:"metrics"}]->(:NESTED_ATTRIBUTE{node_name:metric_name}) + +// Step 2: Use APOC to find the path from each node to the root, excluding 'NESTED_ATTRIBUTE' nodes +CALL apoc.path.subgraphNodes(resultNode, { + relationshipFilter: 'PARENT_OF<', + labelFilter: '-NESTED_ATTRIBUTE', + maxLevel: -1 +}) YIELD node AS pathNode + +// Step 3: Collect the nodes along the path and construct the full name +WITH resultNode, + COLLECT(pathNode) AS pathNodes +WITH resultNode, + REDUCE(fullPath = '', n IN pathNodes | + CASE + WHEN fullPath = '' THEN n.node_name + ELSE n.node_name + '/' + fullPath + END) AS fullName -// Step 6: Match directly connected nested children of the last node that have the label 'NESTED_ATTRIBUTE' -OPTIONAL MATCH (lastNode)-[r:PARENT_OF]->(nestedChild:NESTED_ATTRIBUTE) +// Step 4: Find nested children with label "NESTED_ATTRIBUTE" and their relationships -// Step 7: Match any children of the nestedChild if they have the label "NESTED_ATTRIBUTE" +OPTIONAL MATCH (resultNode)-[r:PARENT_OF]->(nestedChild:NESTED_ATTRIBUTE) OPTIONAL MATCH (nestedChild)-[nestedRel:PARENT_OF*]->(child:NESTED_ATTRIBUTE) -// Step 8: Collect results -WITH DISTINCT mqttTopic, lastNode, - COLLECT(DISTINCT nestedChild) AS nestedChildren, - COLLECT(DISTINCT r) + COLLECT(DISTINCT nestedRel) AS relationships - -// Step 9: Return the MQTT topic path, the last node, nested children, and relationships -RETURN - mqttTopic, - lastNode as node, - nestedChildren, - relationships +// Step 5: Return the full path, resultNode, nested children, and relationships +RETURN DISTINCT + fullName, + resultNode, + COLLECT(DISTINCT nestedChild) AS nestedChildren, + COLLECT(DISTINCT r) + COLLECT(DISTINCT nestedRel) AS relationships \ No newline at end of file diff --git a/07_uns_graphql/test/backend/test_graphdb.py b/07_uns_graphql/test/backend/test_graphdb.py index c6c58159..2a1ae60d 100644 --- a/07_uns_graphql/test/backend/test_graphdb.py +++ b/07_uns_graphql/test/backend/test_graphdb.py @@ -25,6 +25,56 @@ from uns_graphql.backend.graphdb import GraphDB from uns_graphql.graphql_config import GraphDBConfig +QUERY = """ + WITH $propertyNames AS propertyNames + UNWIND propertyNames AS propertyName + // Step 1: Find all nodes containing the specified property + // Use a sub query to handle both MATCH conditions + CALL (propertyName) { + // Match nodes that directly contain the specified property + MATCH (simple_node) // dynamically add the label filter here + WHERE simple_node[propertyName] IS NOT NULL + RETURN DISTINCT simple_node AS resultNode + UNION + // Match nodes that are related via a specific relationship property + MATCH (nested_node)-[r:PARENT_OF {attribute_name: propertyName}]->(:NESTED_ATTRIBUTE) + WHERE r.type IN ["list", "dict"] + RETURN DISTINCT nested_node AS resultNode + } + + // Step 2: Use APOC to find the path from each node to the root, excluding 'NESTED_ATTRIBUTE' nodes + CALL apoc.path.subgraphNodes(resultNode, { + relationshipFilter: 'PARENT_OF<', + labelFilter: '-NESTED_ATTRIBUTE', + maxLevel: -1 + }) YIELD node AS pathNode + + // Step 3: Collect the nodes along the path and construct the full name + WITH resultNode, + COLLECT(pathNode) AS pathNodes + WITH resultNode, + REDUCE(fullPath = '', n IN pathNodes | + CASE + WHEN fullPath = '' THEN n.node_name + ELSE n.node_name + '/' + fullPath + END) AS fullName + // Step 4: Apply the topic filter (array of regex expressions) + WITH resultNode, fullName, $topicFilter AS topicFilter + WHERE ANY(regex IN topicFilter WHERE fullName =~ regex) + + // Step 5: Find nested children with label "NESTED_ATTRIBUTE" and their relationships + OPTIONAL MATCH (resultNode)-[r:PARENT_OF]->(nestedChild:NESTED_ATTRIBUTE) + OPTIONAL MATCH (nestedChild)-[nestedRel:PARENT_OF*]->(child:NESTED_ATTRIBUTE) + + // Step 6: Return the full path, resultNode, nested children, and relationships + RETURN DISTINCT + fullName, + resultNode, + COLLECT(DISTINCT nestedChild) AS nestedChildren, + COLLECT(DISTINCT r) + COLLECT(DISTINCT nestedRel) AS relationships + """ +QUERY_PARAMS = {"propertyNames": ["seq", "dict_list"], "topicFilter": ["(.)*"]} + @pytest_asyncio.fixture async def mock_graphdb_driver(): @@ -39,7 +89,7 @@ async def mock_graphdb_driver(): @pytest.mark.asyncio async def test_get_graphdb_driver(mock_config, mock_driver_class, mock_graphdb_driver): """ - Test with mock object to validate singulatiry of the neo4j driver + Test with mock object to validate singularity of the neo4j driver """ mock_driver_class.return_value = mock_graphdb_driver mock_config.conn_url = GraphDBConfig.conn_url @@ -59,7 +109,7 @@ async def test_get_graphdb_driver(mock_config, mock_driver_class, mock_graphdb_d driver2 = await GraphDB.get_graphdb_driver() mock_driver_class.assert_called_once() # Should still be called only once - assert driver1 == driver2, "The driver instace should be same befor releasing" + assert driver1 is driver2, "The driver instance should be same before releasing" finally: await GraphDB.release_graphdb_driver() @@ -68,7 +118,7 @@ async def test_get_graphdb_driver(mock_config, mock_driver_class, mock_graphdb_d @pytest.mark.asyncio async def test_release_graphdb_driver(mock_graphdb_driver): """ - Validatres that the driver was closed + Validates that the driver was closed """ await GraphDB.release_graphdb_driver() mock_graphdb_driver.close.assert_called_once() @@ -82,9 +132,10 @@ async def test_release_graphdb_driver(mock_graphdb_driver): ("MATCH (n:NOMAD {name: 'IamNotFound'}) RETURN n", None, None, False), # Valid query but empty result, no params ("MATCH (n) RETURN n", None, None, False), # Valid query valid results ("CREATE (n:NEW_NODE) RETURN n", None, None, False), # Valid insert valid results + (QUERY, None, QUERY_PARAMS, False), ], ) -async def test_execute_read_query( +async def test_execute_query( query: str, args: tuple, kwargs: dict, @@ -96,10 +147,18 @@ async def test_execute_read_query( # Initialize the GraphDB graph_db = GraphDB() try: - result = await graph_db.execute_query(query=query, args=args, kwargs=kwargs) + # Pass args and kwargs only if they are not None + if args is None and kwargs is None: + result = await graph_db.execute_query(query) + elif args is None: + result = await graph_db.execute_query(query, **kwargs) + elif kwargs is None: + result = await graph_db.execute_query(query, *args) + else: + result = await graph_db.execute_query(query, *args, **kwargs) assert result is not None except Exception as ex: if is_error: assert True # Error was expected else: - pytest.fail(f"Exception {ex} occurred while executing quwery: {query} ") + pytest.fail(f"Exception {ex} occurred while executing query: {query} ") diff --git a/07_uns_graphql/test/queries/test_graph.py b/07_uns_graphql/test/queries/test_graph.py index 6e0a5f29..3a65c9e1 100644 --- a/07_uns_graphql/test/queries/test_graph.py +++ b/07_uns_graphql/test/queries/test_graph.py @@ -17,14 +17,357 @@ Test cases for uns_graphql.queries.graph.Query """ +import asyncio +from datetime import UTC, datetime from pathlib import Path +from unittest.mock import MagicMock, patch +import pytest import pytest_asyncio + +# import strawberry +# import strawberry.tools +from neo4j import Record +from neo4j.graph import Node, Relationship from uns_graphql.backend.graphdb import GraphDB +from uns_graphql.graphql_config import GraphDBConfig +from uns_graphql.input.mqtt import MQTTTopic, MQTTTopicInput +from uns_graphql.queries.graph import NODE_RELATION_NAME, REL_ATTR_KEY, REL_ATTR_TYPE, REL_INDEX +from uns_graphql.queries.graph import Query as GraphQuery +from uns_graphql.type.isa95_node import UNSNode + +main_node = MagicMock(spec=Node, autospec=True) +main_node.element_id = "main_node_id" +main_node.labels = frozenset({"LINE"}) +main_node.items.return_value = { + "node_name": "ln4", + "_created_timestamp": 1486144500000, + "_modified_timestamp": 1486144510000, + "TestMetric2": "Test_UNS_with_NestedLists", + "timestamp": 1486144510000, +}.items() + +uns_child_1 = MagicMock(spec=Node, autospec=True) +uns_child_1.element_id = "nested_child_1" +uns_child_1.labels = frozenset({"NESTED_ATTRIBUTE", "TEST_MULTIPLE_LABELS"}) +uns_child_1.items.return_value = {"node_name": "dict_list_1", "_created_timestamp": 1486144500000, "x": "y"}.items() + + +uns_child_2 = MagicMock(spec=Node, autospec=True) +uns_child_2.element_id = "nested_child_2" +uns_child_2.labels = frozenset({"NESTED_ATTRIBUTE"}) +uns_child_2.items.return_value = {"node_name": "dict_list_0", "_created_timestamp": 1486144500000, "a": "b"}.items() + +uns_child_3 = MagicMock(spec=Node, autospec=True) +uns_child_3.element_id = "nested_child_3" +uns_child_3.labels = frozenset({"NESTED_ATTRIBUTE"}) +uns_child_3.items.return_value = { + "node_name": "nested", + "_created_timestamp": 1486144500000, + "nest 1": "nest val 1", +}.items() + + +uns_rel_1 = MagicMock(spec=Relationship, autospec=True) +uns_rel_1.element_id = "relationship_1 main_node->nested_child_1" +uns_rel_1.nodes = [main_node, uns_child_1] +uns_rel_1.type = NODE_RELATION_NAME +uns_rel_1.__getitem__.side_effect = ( + lambda key: "dict_list" if key == REL_ATTR_KEY else "1" if key == REL_INDEX else "list" if key == REL_ATTR_TYPE else None +) + +uns_rel_2 = MagicMock(spec=Relationship, autospec=True) +uns_rel_2.element_id = "relationship_1 main_node->nested_child_2" +uns_rel_2.nodes = [main_node, uns_child_2] +uns_rel_2.type = NODE_RELATION_NAME +uns_rel_2.__getitem__.side_effect = ( + lambda key: "dict_list" if key == REL_ATTR_KEY else "0" if key == REL_INDEX else "list" if key == REL_ATTR_TYPE else None +) + +uns_rel_3 = MagicMock(spec=Relationship, autospec=True) +uns_rel_3.element_id = "relationship_1 main_node->nested_child_3" +uns_rel_3.nodes = [main_node, uns_child_3] +uns_rel_3.type = NODE_RELATION_NAME +uns_rel_3.__getitem__.side_effect = ( + lambda key: "nested_dict" if key == REL_ATTR_KEY else "dict" if key == REL_ATTR_TYPE else None +) + +uns_result: list[Record] = [ + Record( + { + "fullName": "test/uns/ar2/ln4", + "resultNode": main_node, + "nestedChildren": [uns_child_1, uns_child_2, uns_child_3], + "relationships": [uns_rel_1, uns_rel_2, uns_rel_3], + } + ) +] +uns_dict_result = { + "TestMetric2": "Test_UNS_with_NestedLists", + "timestamp": 1486144510000, + "nested_dict": {"nest 1": "nest val 1"}, + "dict_list": [{"a": "b"}, {"x": "y"}], +} + +sbp_node = MagicMock(spec=Node, autospec=True) +sbp_node.element_id = "4:957eb9e7-0a52-4e69-975c-087ce8dcb4c4:657" +sbp_node.labels = frozenset({"EDGE_NODE"}) +sbp_node.items.return_value = { + "_created_timestamp": 1671554024644, + "node_name": "eon1", + "seq": 0, + "timestamp": 1671554024644, +}.items() + +spb_child_1 = MagicMock(spec=Node, autospec=True) +spb_child_1.element_id = "4:957eb9e7-0a52-4e69-975c-087ce8dcb4c4:800" +spb_child_1.labels = frozenset({"NESTED_ATTRIBUTE"}) +spb_child_1.items.return_value = { + "datatype": 11, + "_created_timestamp": 1671554024644, + "node_name": "Outputs/F", + "name": "Outputs/F", + "alias": 3, + "value": False, + "timestamp": 1486144502122, +}.items() + +spb_child_2 = MagicMock(spec=Node, autospec=True) +spb_child_2.element_id = "4:957eb9e7-0a52-4e69-975c-087ce8dcb4c4:900" +spb_child_2.labels = frozenset({"NESTED_ATTRIBUTE"}) +spb_child_2.items.return_value = { + "datatype": 11, + "_created_timestamp": 1671554024644, + "node_name": "Inputs/B", + "name": "Inputs/B", + "alias": 1, + "value": False, + "timestamp": 1486144502122, +}.items() + +spb_child_3 = MagicMock(spec=Node, autospec=True) +spb_child_3.element_id = "4:957eb9e7-0a52-4e69-975c-087ce8dcb4c4:950" +spb_child_3.labels = frozenset({"NESTED_ATTRIBUTE"}) +spb_child_3.items.return_value = { + "datatype": 11, + "_created_timestamp": 1671554024644, + "node_name": "Inputs/A", + "name": "Inputs/A", + "alias": 0, + "value": False, + "timestamp": 1486144502122, +}.items() + +spb_rel_1 = MagicMock(spec=Relationship, autospec=True) +spb_rel_1.element_id = "4:957eb9e7-0a52-4e69-975c-087ce8dcb4c4:703" +spb_rel_1.nodes = [sbp_node, spb_child_1] +spb_rel_1.type = NODE_RELATION_NAME +spb_rel_1.__getitem__.side_effect = ( + lambda key: "metrics" if key == REL_ATTR_KEY else "2" if key == REL_INDEX else "list" if key == REL_ATTR_TYPE else None +) + +spb_rel_2 = MagicMock(spec=Relationship, autospec=True) +spb_rel_2.element_id = "4:957eb9e7-0a52-4e69-975c-087ce8dcb4c4:702" +spb_rel_2.nodes = [sbp_node, spb_child_2] +spb_rel_2.type = NODE_RELATION_NAME +spb_rel_2.__getitem__.side_effect = ( + lambda key: "metrics" if key == REL_ATTR_KEY else "1" if key == REL_INDEX else "list" if key == REL_ATTR_TYPE else None +) +spb_rel_3 = MagicMock(spec=Relationship, autospec=True) +spb_rel_3.element_id = "4:957eb9e7-0a52-4e69-975c-087ce8dcb4c4:701" +spb_rel_3.nodes = [sbp_node, spb_child_3] +spb_rel_3.type = NODE_RELATION_NAME +spb_rel_3.__getitem__.side_effect = ( + lambda key: "metrics" if key == REL_ATTR_KEY else "0" if key == REL_INDEX else "list" if key == REL_ATTR_TYPE else None +) + +spb_result: list[Record] = [ + Record( + { + "fullName": "spBv1.0/uns_group/NDATA/eon1", + "resultNode": sbp_node, + "nestedChildren": [spb_child_1, spb_child_2, spb_child_3], + "relationships": [spb_rel_1, spb_rel_2, spb_rel_3], + } + ) +] +spb_result_dict: dict = { + "seq": 0, + "timestamp": 1671554024644, + "metrics": [ + { + "datatype": 11, + "name": "Inputs/A", + "alias": 0, + "value": False, + "timestamp": 1486144502122, + }, + { + "datatype": 11, + "name": "Inputs/B", + "alias": 1, + "value": False, + "timestamp": 1486144502122, + }, + { + "datatype": 11, + "name": "Outputs/F", + "alias": 3, + "value": False, + "timestamp": 1486144502122, + }, + ], +} + +# Mock the datahandler for UNS queries +mocked_uns_graphdb = MagicMock(spec=GraphDB, autospec=True) +# Mocking all the query functions to give the same result +mocked_uns_graphdb.execute_query.return_value = uns_result + +# Mock the datahandler for SPB queries +mocked_spb_graphdb = MagicMock(spec=GraphDB, autospec=True) +# Mocking all the query functions to give the same result +mocked_spb_graphdb.execute_query.return_value = spb_result + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "topics,has_result_errors", + [ + (["topic1/#"], False), + (["topic1/+"], False), + (["topic1/testString"], False), + (["#"], False), + (["topic1/#", "topic3"], False), + (["topic1/subtopic1", "topic2/topic3"], False), + (["+"], False), + ], +) +async def test_get_uns_nodes( + topics: list[str], + has_result_errors: bool, +): + mqtt_topic_list = [MQTTTopicInput.from_pydantic(MQTTTopic(topic=topic)) for topic in topics] + with patch("uns_graphql.queries.graph.GraphDB", return_value=mocked_uns_graphdb): + graph_query = GraphQuery() + try: + result = await graph_query.get_uns_nodes(mqtt_topics=mqtt_topic_list) + except Exception as ex: + assert has_result_errors, f"Should not throw any exceptions. Got {ex}" + assert result is not None # test was successful + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "property_keys, topics, exclude_topics, has_result_errors", + [ + (["prop1"], ["topic1/#"], False, False), + ("prop1", ["topic1/#"], False, False), + (["prop1", "prop2"], ["topic1/+"], True, False), + (["124"], None, None, False), + (["prop1", "prop2"], ["topic1/subtopic"], True, False), + (["prop1", "prop2"], ["topic1/#", "topic3"], True, False), + (["prop1", "prop2"], ["+"], True, False), + ], +) +async def test_get_uns_nodes_by_property( + property_keys, + topics: list[str], + exclude_topics, + has_result_errors: bool, +): + mqtt_topic_list = None + if topics is not None: + mqtt_topic_list = [MQTTTopicInput.from_pydantic(MQTTTopic(topic=topic)) for topic in topics] + with patch("uns_graphql.queries.graph.GraphDB", return_value=mocked_uns_graphdb): + graph_query = GraphQuery() + try: + result = await graph_query.get_uns_nodes_by_property( + property_keys=property_keys, topics=mqtt_topic_list, exclude_topics=exclude_topics + ) + except Exception as ex: + assert has_result_errors, f"Should not throw any exceptions. Got {ex}" + assert result is not None # test was successful + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "metric_names", + [(["Inputs/A"]), ("Inputs/A"), (["Inputs/A", "Inputs/B"]), (["Output/F"]), ([]), (None)], +) +async def test_get_spb_nodes_by_metric(metric_names: list[str]): + with patch("uns_graphql.queries.graph.GraphDB", return_value=mocked_spb_graphdb): + graph_query = GraphQuery() + try: + result = await graph_query.get_spb_nodes_by_metric(metric_names=metric_names) + except Exception as ex: + pytest.fail(f"Should not throw any exceptions. Got {ex}") + assert result is not None # test was successful + + +@pytest.mark.parametrize( + "labels, valid_labels, expected_result ", + [ + (["ENTERPRISE", "test1"], GraphDBConfig.uns_node_types, "ENTERPRISE"), + (["FACILITY", "test1"], GraphDBConfig.uns_node_types, "FACILITY"), + (["AREA", "test1"], GraphDBConfig.uns_node_types, "AREA"), + (["LINE", "test1"], GraphDBConfig.uns_node_types, "LINE"), + (["DEVICE", "test1"], GraphDBConfig.uns_node_types, "DEVICE"), + (["NOT_LISTED_1", "NOT_LISTED_2"], GraphDBConfig.uns_node_types, None), + (["ENTERPRISE"], GraphDBConfig.uns_node_types, "ENTERPRISE"), + (["FACILITY"], GraphDBConfig.uns_node_types, "FACILITY"), + (["AREA"], GraphDBConfig.uns_node_types, "AREA"), + (["LINE"], GraphDBConfig.uns_node_types, "LINE"), + (["DEVICE"], GraphDBConfig.uns_node_types, "DEVICE"), + (["NOT_LISTED"], GraphDBConfig.uns_node_types, None), + (["spBv1_0"], (*GraphDBConfig.spb_node_types, "STATE"), "spBv1_0"), + (["GROUP"], (*GraphDBConfig.spb_node_types, "STATE"), "GROUP"), + (["MESSAGE_TYPE"], (*GraphDBConfig.spb_node_types, "STATE"), "MESSAGE_TYPE"), + (["EDGE_NODE"], (*GraphDBConfig.spb_node_types, "STATE"), "EDGE_NODE"), + (["DEVICE"], (*GraphDBConfig.spb_node_types, "STATE"), "DEVICE"), + ], +) +def test_get_node_type(labels: list[str], valid_labels: tuple[str], expected_result: str): + """ + Test GraphQuery.get_node_type() + """ + result = GraphQuery.get_node_type(labels=labels, valid_labels=valid_labels) + assert result == expected_result + + +@pytest.mark.parametrize( + "parent, nested_children, relationships, expected_result, is_error", + [ + (main_node, [uns_child_1, uns_child_2, uns_child_3], [uns_rel_1, uns_rel_2, uns_rel_3], uns_dict_result, False), + (sbp_node, [spb_child_1, spb_child_2, spb_child_3], [spb_rel_1, spb_rel_2, spb_rel_3], spb_result_dict, False), + (None, [spb_child_1, spb_child_2, spb_child_3], [spb_rel_1, spb_rel_2, spb_rel_3], None, True), + (main_node, None, None, {"TestMetric2": "Test_UNS_with_NestedLists", "timestamp": 1486144510000}, False), + ], +) +def test_get_nested_properties( + parent: Node, nested_children: list[Node], relationships: list[Relationship], expected_result: dict, is_error: bool +): + """ + Test GraphQuery.get_nested_properties() + """ + try: + result = GraphQuery.get_nested_properties(parent=parent, nested_children=nested_children, relationships=relationships) + assert result == expected_result + except Exception as ex: + assert is_error, f"Should not throw any exceptions. Got {ex}" + + +@pytest.fixture(scope="session") +def my_event_loop(request): # noqa: ARG001 + """Create an instance of the default event loop for each test case.""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + yield loop + loop.close() @pytest_asyncio.fixture(scope="module") -async def setup_graphdb_data(): +async def setup_graphdb_data(my_event_loop): # noqa: ARG001 """Fixture to set up data in the GraphDB from the test_data.cypher file.""" # Read the Cypher script as read only with open(file=Path(__file__).resolve().parent / "test_data.cypher") as file: @@ -46,3 +389,66 @@ async def setup_graphdb_data(): await graph_db.execute_query("MATCH (n) DETACH DELETE n;") # Release the driver await graph_db.release_graphdb_driver() + + +@pytest.mark.asyncio +@pytest.mark.integrationtest +@pytest.mark.parametrize( + "topics, expected_result", + [ + ( + ["+", "test/uns/ar2/ln4"], + [ + UNSNode( + namespace="test", + node_name="test", + node_type="ENTERPRISE", + payload={}, + created=datetime.fromtimestamp(1486144500, UTC), + last_updated=datetime.fromtimestamp(1486144500, UTC), + ), + UNSNode( + namespace="test/uns/ar2/ln4", + node_name="ln4", + node_type="LINE", + payload={ + "TestMetric2": "TestUNSwithNestedLists", + "timestamp": 1486144500000, + "dict_list": [{"a": "b"}, {"x": "y"}], + }, + created=datetime.fromtimestamp(1486144500, UTC), + last_updated=datetime.fromtimestamp(1486144500, UTC), + ), + ], + ), + ], +) +async def test_get_uns_nodes_integration(setup_graphdb_data, topics: list[str], expected_result: list[UNSNode]): # noqa: ARG001 + mqtt_topic_list = [MQTTTopicInput.from_pydantic(MQTTTopic(topic=topic)) for topic in topics] + graph_query = GraphQuery() + try: + result = await graph_query.get_uns_nodes(mqtt_topics=mqtt_topic_list) + except Exception as ex: + pytest.fail(f"Should not throw any exceptions. Got {ex}") + assert result == expected_result # Ensure the result matches the expected result + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "property_keys, topics, exclude_topics,expected_result", + [ + (["prop1"], ["topic1/#"], False, []), + ], +) +async def test_get_uns_nodes_by_property_integration( + setup_graphdb_data, # noqa: ARG001 + property_keys, + topics: list[str], + exclude_topics: bool, + expected_result: dict, +): + pytest.fail() + + +async def test_strawberry_get_uns_nodes(topics: list[str]): + pytest.fail() diff --git a/07_uns_graphql/test/type/test_sparkplugb_node.py b/07_uns_graphql/test/type/test_sparkplugb_node.py index 13336366..966d69db 100644 --- a/07_uns_graphql/test/type/test_sparkplugb_node.py +++ b/07_uns_graphql/test/type/test_sparkplugb_node.py @@ -463,27 +463,20 @@ def test_spb_node(topic: str, payload: Payload | bytes): compare_metrics(spb_node_metric, payload_metric) -def compare_metrics(graphql_metric: SPBMetric, payload_metric: Payload.Metric): +def compare_metrics(graphql_metric: SPBMetric, payload_metric: Payload.Metric): # noqa: C901 """ Utility method to compare metrics and handle float in value, template , dataset """ spb_metric = SPBMetric(payload_metric) - - assert graphql_metric.alias == spb_metric.alias - if payload_metric.HasField("alias"): - assert spb_metric.alias == payload_metric.alias - - assert graphql_metric.is_null == spb_metric.is_null - if payload_metric.HasField("is_null"): - assert spb_metric.is_null == payload_metric.is_null - - assert graphql_metric.is_historical == spb_metric.is_historical - if payload_metric.HasField("is_historical"): - assert spb_metric.is_historical == payload_metric.is_historical - - assert graphql_metric.is_transient == spb_metric.is_transient - if payload_metric.HasField("is_transient"): - assert spb_metric.is_transient == payload_metric.is_transient + fields = [ + "alias", + "is_null", + "is_historical", + "is_transient", + "timestamp", + ] + for field in fields: + assert getattr(graphql_metric, field) == getattr(spb_metric, field) assert graphql_metric.metadata == spb_metric.metadata if payload_metric.HasField("metadata"): @@ -493,8 +486,6 @@ def compare_metrics(graphql_metric: SPBMetric, payload_metric: Payload.Metric): if payload_metric.HasField("properties"): compare_propertyset(graphql_metric.properties, payload_metric.properties) - assert graphql_metric.timestamp == spb_metric.timestamp - assert graphql_metric.datatype == spb_metric.datatype == SPBMetricDataTypes(payload_metric.datatype).name # compare values and handle floating point precision issue