Skip to content

Commit

Permalink
Added progress monitor and tests (#105)
Browse files Browse the repository at this point in the history
Co-authored-by: pyansys-ci-bot <92810346+pyansys-ci-bot@users.noreply.github.com>
  • Loading branch information
philipjusher and pyansys-ci-bot authored Sep 4, 2024
1 parent 45861f8 commit 1b7f453
Show file tree
Hide file tree
Showing 10 changed files with 408 additions and 108 deletions.
1 change: 1 addition & 0 deletions doc/changelog.d/105.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added progress monitor and tests
10 changes: 7 additions & 3 deletions examples/simple_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,18 @@
import datetime
import os
from pathlib import Path
import time

import plotly.graph_objects as go

from ansys.conceptev.core import app, auth

# ## Set up environment variables
# Preferred method is to use AnsysID. So set to True.
# AnsysID is the only supported method.
# We only use the other one here for automated testing. So set to True.
use_ansys_id = False # True


if not (use_ansys_id):
# Set environment variables for ConceptEV username and password if they don't exist!
if os.environ.get("CONCEPTEV_USERNAME") is None:
Expand Down Expand Up @@ -229,9 +232,10 @@
# Create and submit a job
concept = app.get(client, "/concepts", id=design_instance_id, params={"populated": True})
job_info = app.create_submit_job(client, concept, account_id, hpc_id)

# Read the results and show the result in your browser
results = app.read_results(client, job_info, calculate_units=False, rate_limit=3)
if not use_ansys_id:
time.sleep(120) # wait for it to complete - not needed in real code
results = app.read_results(client, job_info, calculate_units=False)
x = results[0]["capability_curve"]["speeds"]
y = results[0]["capability_curve"]["torques"]

Expand Down
188 changes: 110 additions & 78 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ httpx = "^0.26.0"
msal = "^1.29.0"
msal-extensions = "^1.2.0"
tomli = {version = "^2.0.1", python = "<3.11"}
websockets = "^13.0.1"

[tool.poetry.group.dev.dependencies] # Common packages for test and examples
plotly = "^5.18.0"
Expand Down Expand Up @@ -80,6 +81,7 @@ pytest-httpx = "^0.29.0"
pytest-mock = "^3.12.0"

# Optional build requirements
pytest-asyncio = "^0.24.0"
[tool.poetry.group.build]
optional = true

Expand Down
72 changes: 48 additions & 24 deletions src/ansys/conceptev/core/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
import datetime
from json import JSONDecodeError
import os
import time
from typing import Literal

import dotenv
import httpx

from ansys.conceptev.core import auth
from ansys.conceptev.core.progress import check_status, monitor_job_progress

dotenv.load_dotenv()

Expand Down Expand Up @@ -195,9 +195,8 @@ def create_new_concept(
if created_design.status_code not in (200, 204):
raise Exception(f"Failed to create a design on OCM {created_design.content}.")

user_details = httpx.post(osm_url + "/user/details", headers={"Authorization": token})
if user_details.status_code not in (200, 204):
raise Exception(f"Failed to get a user details on OCM {user_details}.")
user_id = get_user_id(token)

design_instance_id = created_design.json()["designInstanceList"][0]["designInstanceId"]
concept_data = {
"capabilities_ids": [],
Expand All @@ -210,7 +209,7 @@ def create_new_concept(
"name": "Branch 1",
"project_id": project_id,
"requirements_ids": [],
"user_id": user_details.json()["userId"],
"user_id": user_id,
}

query = {
Expand All @@ -221,6 +220,16 @@ def create_new_concept(
return created_concept


def get_user_id(token):
"""Get the user ID."""
osm_url = auth.config["OCM_URL"]
user_details = httpx.post(osm_url + "/user/details", headers={"Authorization": token})
if user_details.status_code not in (200, 204):
raise Exception(f"Failed to get a user details on OCM {user_details}.")
user_id = user_details.json()["userId"]
return user_id


def get_concept_ids(client: httpx.Client) -> dict:
"""Get concept IDs."""
concepts = get(client, "/concepts")
Expand Down Expand Up @@ -290,29 +299,44 @@ def read_results(
client,
job_info: dict,
calculate_units: bool = True,
no_of_tries: int = 200,
rate_limit: float = 0.3,
) -> dict:
"""Read job results.
"""Read job results."""
job_id = job_info["job_id"]
token = client.headers["Authorization"]
user_id = get_user_id(token)
initial_status = get_status(job_info, token)
if check_status(initial_status): # Job already completed
return get_results(client, job_info, calculate_units)
else: # Job is still running
monitor_job_progress(job_id, user_id, token) # Wait for completion
return get_results(client, job_info, calculate_units)

Continuously request job results until a valid response is received or a limit of tries is
reached.
"""

def get_results(client, job_info: dict, calculate_units: bool = True):
"""Get the results."""
version_number = get(client, "/utilities:data_format_version")
for _ in range(0, no_of_tries):
response = client.post(
url="/jobs:result",
json=job_info,
params={
"results_file_name": f"output_file_v{version_number}.json",
"calculate_units": calculate_units,
},
)
time.sleep(rate_limit)
if response.status_code == 200:
return response.json()
response = client.post(
url="/jobs:result",
json=job_info,
params={
"results_file_name": f"output_file_v{version_number}.json",
"calculate_units": calculate_units,
},
)
return process_response(response)


raise Exception(f"There are too many requests: {response}.")
def get_status(job_info: dict, token: str) -> str:
"""Get the status of the job."""
ocm_url = auth.config["OCM_URL"]
response = httpx.post(
url=ocm_url + "/job/load",
json={"jobId": job_info["job_id"]},
headers={"Authorization": token},
)
processed_response = process_response(response)
initial_status = processed_response["jobStatus"][-1]["jobStatus"]
return initial_status


def post_component_file(client: httpx.Client, filename: str, component_file_type: str) -> dict:
Expand Down
2 changes: 1 addition & 1 deletion src/ansys/conceptev/core/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def get_ansyId_token(app) -> str:
# Now let's try to find a token in cache for this account
result = app.acquire_token_silent(scopes=[scope], account=chosen)
if not result:
result = app.acquire_token_interactive(scopes=[scope], timeout=10)
result = app.acquire_token_interactive(scopes=[scope])
if "access_token" in result:
return result["access_token"]
error = result.get("error")
Expand Down
103 changes: 103 additions & 0 deletions src/ansys/conceptev/core/progress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Copyright (C) 2023 - 2024 ANSYS, Inc. and/or its affiliates.
# SPDX-License-Identifier: MIT
#
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

"""Progress monitoring with websockets."""

import asyncio
import json
import ssl

import certifi
from websockets.asyncio.client import connect

STATUS_COMPLETE = "complete"
STATUS_FINISHED = "FINISHED"
STATUS_ERROR = "failed"

ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.load_verify_locations(certifi.where())


def connect_to_ocm(user_id: str, token: str):
"""Connect to the OnScale Cloud Messaging service."""
uri = (
f"wss://sockets.prod.portal.onscale.com/socket/user?userId={user_id}&Authorization={token}"
)
return connect(uri, ssl=ssl_context)


def parse_message(message: str, job_id: str):
"""Parse the message and return the status or progress."""
message_data = json.loads(message)

if message_data.get("jobId", "Unknown") == job_id:
message_type = message_data.get("messagetype", None)
if message_type == "status":
status = message_data.get("status", None)
print(f"Status:{status}")
return status
elif message_type == "progress":
progress = message_data.get("progress", None)
print(f"Progress:{progress}")
elif message_type == "error":
error = message_data.get("message", None)
print(f"Error:{error}")


async def monitor_job_messages(job_id: str, user_id: str, token: str):
"""Monitor job messages and return the status when complete."""
websocket_client = connect_to_ocm(user_id, token)
async with websocket_client as websocket:

print("Connected to OCM Websockets.")
async for message in websocket:
status = parse_message(message, job_id)
if check_status(status):
return status


def check_status(status: str):
"""Check if the status is complete or finished."""
if status == STATUS_COMPLETE or status == STATUS_FINISHED:
return True
elif status == STATUS_ERROR:
raise Exception("Job Failed")
else:
return False


def monitor_job_progress(job_id: str, user_id: str, token: str):
"""Monitor job progress and return the status when complete."""
result = asyncio.run(monitor_job_messages(job_id, user_id, token))
return result


if __name__ == "__main__":
"""Monitor a single job progress."""
from ansys.conceptev.core.app import get_user_id
from ansys.conceptev.core.auth import create_msal_app, get_ansyId_token

job_id = "ae3f3b4b-91d8-4cdd-8fa3-25eb202a561e" # Replace with your job ID
msal_app = create_msal_app()
token = get_ansyId_token(msal_app)
user_id = get_user_id(token)
monitor_job_progress(job_id, user_id, token)
8 changes: 7 additions & 1 deletion tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ def test_read_file(mocker):


def test_read_results(httpx_mock: HTTPXMock, client: httpx.Client):
example_job_info = {"job": "mocked_job"}
example_job_info = {"job": "mocked_job", "job_id": "123"}
example_results = {"results": "returned"}
httpx_mock.add_response(
url=f"{conceptev_url}/utilities:data_format_version?design_instance_id=123",
Expand All @@ -317,6 +317,12 @@ def test_read_results(httpx_mock: HTTPXMock, client: httpx.Client):
match_json=example_job_info,
json=example_results,
)
httpx_mock.add_response(
url=ocm_url + "/user/details", method="post", json={"userId": "user_123"}
)
httpx_mock.add_response(
url=ocm_url + "/job/load", method="post", json={"jobStatus": [{"jobStatus": "In Progress"}]}
)
results = app.read_results(client, example_job_info)
assert example_results == results

Expand Down
2 changes: 1 addition & 1 deletion tests/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def get_accounts(self):
def acquire_token_silent(self, scopes, account):
return {"access_token": "mock_cached_token"}

def acquire_token_interactive(self, scopes, timeout):
def acquire_token_interactive(self, scopes):
return {"access_token": "mock_token"}


Expand Down
Loading

0 comments on commit 1b7f453

Please sign in to comment.