Skip to content

Commit

Permalink
updated graph db graphQL queries and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mkashwin committed Sep 15, 2024
1 parent 83f6053 commit 02738af
Show file tree
Hide file tree
Showing 9 changed files with 1,135 additions and 135 deletions.
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"createdb",
"createrole",
"createuser",
"cypher",
"DBIRTH",
"dbuser",
"DCMD",
Expand Down Expand Up @@ -112,7 +113,7 @@
"./07_uns_graphql/src",
"./07_uns_graphql/test",
],
"python.languageServer": "Pylance",
"python.languageServer": "Default",
"python.missingPackage.severity": "Error",
"ruff.showNotifications": "always",
"ruff.trace.server": "messages",
Expand Down
4 changes: 2 additions & 2 deletions 03_uns_graphdb/src/uns_graphdb/graphdb_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ def connect(self, retry: int = 0) -> neo4j.Driver:
----------
retry: int
Optional parameters to retry making a connection in case of errors.
The max number of retry is `GraphDBHandler.MAX_RETRIES`
The time between attempts is `GraphDBHandler.SLEEP_BTW_ATTEMPT`
The max number of retry is `GraphDBHandler.max_retry`
The time between attempts is `GraphDBHandler.sleep_btw_attempts`
Returns:
neo4j.Driver: The Neo4j driver object.
Expand Down
108 changes: 99 additions & 9 deletions 07_uns_graphql/schema/uns_schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,102 @@ type Query {
"""
getUnsNodes(topics: [MQTTTopicInput!]!): [UNSNode!]!

"""
Get all UNSNodes published which have specific attributes.Option binary_operator input allows chaining the list of property_keys. If NULL, property_keys will be ORed- OR: Either of the property_keys must exist in the same node. If only one property_keys provided will be ignored- AND: All property_keys must exist in same node. If only one property_keys provided will be ignored- NOT: None of the provided property_keys should exist in the same nodeOther criteria - topics will always be ANDed to the query filter
"""
getUnsNodesByProperty(propertyKeys: [String!]!, binaryOperator: BinaryOperator, topics: [MQTTTopicInput!]): [UNSNode!]!
"Get all UNSNodes published which have specific attribute name as 'property_keys'. \n Optionally Filter results with list of topics. \nIf topics are provided then optional boolean value exclude_topics attribute can be set to true .\nto exclude the nodes which match the topics.\n"
getUnsNodesByProperty(propertyKeys: [String!]!, topics: [MQTTTopicInput!], excludeTopics: Boolean = false): [UNSNode!]!

"""
Get consolidation of sparkplugB nodes. MQTT wildcards are supportedEnsure that the topics are all in the SparkplugB namespace i.e. starts with 'spBv1.0/' Format of an SpB topic is spBv1.0/ <group_id> / <message_type> / <edge_node_id> / [<device_id>]
"""
getSpbNodes(topics: [MQTTTopicInput!]!): [UNSNode!]!
"""Get all the SPBNode by the provided metric name"""
getSpbNodesByMetric(metricNames: [String!]!): [SPBNode!]!
}

type SPBDataSet {
numOfColumns: Union!
columns: [String!]!
types: [String!]!
rows: [SPBDataSetRow!]!
}

type SPBDataSetRow {
elements: [SPBDataSetValue!]!
}

type SPBDataSetValue {
value: SPBPrimitive!
}

type SPBMetadata {
isMultiPart: Boolean
contentType: String
size: Union
seq: Union
fileName: String
fileType: String
md5: String
description: String
}

type SPBMetric {
name: String!
alias: Union
timestamp: DateTime!
datatype: String!
isHistorical: Boolean
isTransient: Boolean
isNull: Boolean
metadata: SPBMetadata
properties: SPBPropertySet
value: SPBPrimitiveBytesPayloadSPBDataSetSPBTemplate!
}

type SPBNode {
topic: String!
timestamp: DateTime!
metrics: [SPBMetric!]!
seq: Union!
uuid: ID
body: Base64
}

"""
Wrapper for primitive types in Sparkplug.: int, float, str, bool, list
Needed because GraphQL does not support str for unions.
Data is converted to its String representation for convenience.
Use the datatype to convert to actual type if needed
"""
type SPBPrimitive {
data: String!
}

union SPBPrimitiveBytesPayloadSPBDataSetSPBTemplate = SPBPrimitive | BytesPayload | SPBDataSet | SPBTemplate

union SPBPrimitiveSPBPropertySetSPBPropertySetList = SPBPrimitive | SPBPropertySet | SPBPropertySetList

type SPBPropertySet {
keys: [String!]!
values: [SPBPropertyValue!]!
}

type SPBPropertySetList {
propertysets: [SPBPropertySet!]!
}

type SPBPropertyValue {
isNull: Boolean
datatype: String!
value: SPBPrimitiveSPBPropertySetSPBPropertySetList!
}

type SPBTemplate {
version: String
metrics: [SPBMetric!]!
parameters: [SPBTemplateParameter!]
templateRef: String
isDefinition: Boolean
}

type SPBTemplateParameter {
name: String!
datatype: String!
value: SPBPrimitive!
}

type StreamingMessage {
Expand All @@ -112,4 +199,7 @@ type UNSNode {
payload: JSONPayload!
created: DateTime!
lastUpdated: DateTime!
}
}

"""Int 64 field since GraphQL doesn't support int64, only int 32"""
scalar Union
25 changes: 19 additions & 6 deletions 07_uns_graphql/src/uns_graphql/backend/graphdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
Encapsulates integration with the Graph database database
"""

import asyncio
import logging

from neo4j import AsyncDriver, AsyncGraphDatabase, AsyncResult, Record
Expand All @@ -26,6 +27,9 @@

LOGGER = logging.getLogger(__name__)

MAX_RETRIES = 5
SLEEP_BTW_ATTEMPT = 10


class GraphDB:
"""
Expand All @@ -37,17 +41,24 @@ class GraphDB:
_graphdb_driver: AsyncDriver = None

@classmethod
async def get_graphdb_driver(cls) -> AsyncDriver:
async def get_graphdb_driver(cls, retry: int = 0) -> AsyncDriver:
"""
Returns the Neo4j async driver which is the connection to the database.
Validates if the current driver is still connected, and if not, creates a new connection.
The driver is cached and reused for subsequent requests to avoid creating multiple connections.
Parameters
----------
retry: int
Optional parameters to retry making a connection in case of errors.
The max number of retry is `MAX_RETRIES`
The time between attempts is `SLEEP_BTW_ATTEMPT` seconds
Returns:
AsyncDriver: The Neo4j async driver.
"""
LOGGER.debug("GraphDB driver requested")
current_loop = asyncio.get_event_loop()
if cls._graphdb_driver is None:
LOGGER.info("Creating a new GraphDB driver")
cls._graphdb_driver = AsyncGraphDatabase.driver(
Expand All @@ -60,11 +71,13 @@ async def get_graphdb_driver(cls) -> AsyncDriver:
LOGGER.error("Failed to verify GraphDB driver connectivity: %s", str(ex), stack_info=True, exc_info=True)
# In case of connectivity failure, close the existing driver and create a new one
await cls.release_graphdb_driver()
cls._graphdb_driver = AsyncGraphDatabase.driver(
uri=GraphDBConfig.conn_url, auth=(GraphDBConfig.user, GraphDBConfig.password)
)
await cls._graphdb_driver.verify_connectivity()
LOGGER.info("Reconnected to GraphDB driver")
if retry >= MAX_RETRIES:
LOGGER.error("No. of retries exceeded %s", str(MAX_RETRIES), stack_info=True, exc_info=True)
raise ex # Re-raise the exception after cleanup

await asyncio.sleep(SLEEP_BTW_ATTEMPT)
cls._graphdb_driver = await cls.get_graphdb_driver(retry + 1)
LOGGER.info("Trying to Reconnect to GraphDB driver")

return cls._graphdb_driver

Expand Down
Loading

0 comments on commit 02738af

Please sign in to comment.