Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add node info endpoint #101

Merged
merged 7 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions firecrest/AsyncClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,36 @@ async def poll_active(

return ret

async def nodes(
self,
machine: str,
nodes: Optional[Sequence[str]] = None,
) -> List[t.JobQueue]:
"""Retrieves information about the compute nodes.
This call uses the `scontrol show nodes` command.

:param machine: the machine name where the scheduler belongs to
:param nodes: specific compute nodes to query
:calls: GET `/compute/nodes`

GET `/tasks/{taskid}`
ekouts marked this conversation as resolved.
Show resolved Hide resolved

.. warning:: This is available only for FirecREST>=1.16.0
"""
params = {}
if nodes:
params["nodes"] = ",".join(nodes)

resp = await self._get_request(
endpoint="/compute/nodes",
additional_headers={"X-Machine-Name": machine},
params=params,
)
json_response = self._json_response([resp], 200)
t = ComputeTask(self, json_response["task_id"], [resp])
result = await t.poll_task("200")
return result

async def cancel(self, machine: str, job_id: str | int) -> str:
"""Cancels running job.
This call uses the `scancel` command.
Expand Down
32 changes: 32 additions & 0 deletions firecrest/BasicClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -1082,6 +1082,38 @@ def poll_active(
)
return list(dict_result.values())

def nodes(
self,
machine: str,
nodes: Optional[Sequence[str]] = None,
) -> List[t.JobQueue]:
"""Retrieves information about the compute nodes.
This call uses the `scontrol show nodes` command.

:param machine: the machine name where the scheduler belongs to
:param nodes: specific compute nodes to query
:calls: GET `/compute/nodes`

GET `/tasks/{taskid}`
ekouts marked this conversation as resolved.
Show resolved Hide resolved

.. warning:: This is available only for FirecREST>=1.16.0
"""
params = {}
if nodes:
params["nodes"] = ",".join(nodes)

resp = self._get_request(
endpoint="/compute/nodes",
additional_headers={"X-Machine-Name": machine},
params=params,
)
self._current_method_requests.append(resp)
json_response = self._json_response(self._current_method_requests, 200)
result = self._poll_tasks(
json_response["task_id"], "200", iter([1, 0.5, 0.25])
)
return result

def cancel(self, machine: str, job_id: str | int) -> str:
"""Cancels running job.
This call uses the `scancel` command.
Expand Down
52 changes: 50 additions & 2 deletions firecrest/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1292,6 +1292,54 @@ def poll_active(
raise typer.Exit(code=1)


@app.command(rich_help_panel="Compute commands")
def get_nodes(
config_from_parent: str = typer.Option(None,
callback=config_parent_load_callback,
is_eager=True,
hidden=True
),
system: str = typer.Option(
..., "-s", "--system", help="The name of the system.", envvar="FIRECREST_SYSTEM"
),
nodes: Optional[List[str]] = typer.Argument(
None, help="List of specific compute nodes to query."
),
raw: bool = typer.Option(False, "--raw", help="Print unformatted."),
):
"""Retrieves information about the compute nodes.
This call uses the `scontrol show nodes` command
"""
try:
results = client.nodes(system, nodes)
if raw:
console.print(results)
else:
parsed_results = []
for item in results:
parsed_item = {}
for key, value in item.items():
if isinstance(value, list):
parsed_item[key] = ", ".join(value)
else:
parsed_item[key] = str(value)

parsed_results.append(parsed_item)

table = create_table(
"Information about jobs in the queue",
parsed_results,
("Name", "NodeName"),
("Partitions", "Partitions"),
("State", "State"),
("Active Features", "ActiveFeatures"),
)
console.print(table)
except Exception as e:
examine_exeption(e)
raise typer.Exit(code=1)


@app.command(rich_help_panel="Compute commands")
def cancel(
config_from_parent: str = typer.Option(None,
Expand All @@ -1312,8 +1360,8 @@ def cancel(
raise typer.Exit(code=1)


@reservation_app.command(rich_help_panel="Reservation commands")
def list(
@reservation_app.command(name='list', rich_help_panel="Reservation commands")
def list_command(
config_from_parent: str = typer.Option(None,
callback=config_parent_load_callback,
is_eager=True,
Expand Down
128 changes: 128 additions & 0 deletions tests/test_compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,28 @@ def submit_path_handler(request: Request):
)


def nodes_request_handler(request: Request):
if not request.query_string or request.query_string == b'nodes=nid001':
ret = {
"success": "Task created",
"task_id": "nodes_info",
"task_url": "/tasks/nodes_info"
}
status_code = 200

if request.query_string == b'nodes=nidunknown':
ret = {
"success": "Task created",
"task_id": "info_unknown_node",
"task_url": "/tasks/info_unknown_node"
}
status_code = 200

return Response(
json.dumps(ret), status=status_code, content_type="application/json"
)


def submit_upload_handler(request: Request):
if request.headers["Authorization"] != "Bearer VALID_TOKEN":
return Response(
Expand Down Expand Up @@ -845,6 +867,62 @@ def tasks_handler(request: Request):
}
status_code = 200

if taskid == "nodes_info":
ret = {
"tasks": {
taskid: {
"created_at": "2024-04-16T09:47:06",
"data": [
{
"ActiveFeatures": [
"f7t"
],
"NodeName": "nid001",
"Partitions": [
"part01",
"part02",
],
"State": [
"IDLE"
]
}
],
"description": "Finished successfully",
"hash_id": "nodes_info",
"last_modify": "2024-04-16T09:47:06",
"service": "compute",
"status": "200",
"system": "cluster",
"task_id": "nodes_info",
"task_url": "/tasks/nodes_info",
"updated_at": "2024-04-16T09:47:06",
"user": "service-account-firecrest-sample"
}
}
}
status_code = 200

if taskid == "info_unknown_node":
ret = {
"tasks": {
taskid: {
"created_at": "2024-04-16T09:56:14",
"data": "Node nidunknown not found",
"description": "Finished with errors",
"hash_id": "info_unknown_node",
"last_modify": "2024-04-16T09:56:14",
"service": "compute",
"status": "400",
"system": "cluster",
"task_id": "info_unknown_node",
"task_url": "/tasks/info_unknown_node",
"updated_at": "2024-04-16T09:56:14",
"user": "service-account-firecrest-sample"
}
}
}
status_code = 400

return Response(
json.dumps(ret), status=status_code, content_type="application/json"
)
Expand Down Expand Up @@ -876,6 +954,10 @@ def fc_server(httpserver):
"/tasks", method="GET"
).respond_with_handler(tasks_handler)

httpserver.expect_request(
"/compute/nodes", method="GET"
).respond_with_handler(nodes_request_handler)

return httpserver


Expand Down Expand Up @@ -1216,6 +1298,17 @@ def test_poll_active(valid_client):
]


def test_cli_get_nodes(valid_credentials):
args = valid_credentials + ["get-nodes", "--system", "cluster1", "nid001"]
result = runner.invoke(cli.app, args=args)
stdout = common.clean_stdout(result.stdout)
assert result.exit_code == 0
assert "Information about jobs in the queue" in stdout
assert "nid001" in stdout
assert "part01, part02" in stdout
assert "IDLE" in stdout
assert "f7t" in stdout

def test_cli_poll_active(valid_credentials):
global queue_retry
queue_retry = 0
Expand Down Expand Up @@ -1294,3 +1387,38 @@ def test_cancel_invalid_machine(valid_client):
def test_cancel_invalid_client(invalid_client):
with pytest.raises(firecrest.UnauthorizedException):
invalid_client.cancel(machine="cluster1", job_id=35360071)


def test_get_nodes(valid_client):
response = [{
"ActiveFeatures": ["f7t"],
"NodeName": "nid001",
"Partitions": [
"part01",
"part02"
],
"State": [
"IDLE"
]
}]
assert valid_client.nodes(machine="cluster1") == response


def test_get_nodes_from_list(valid_client):
response = [{
"ActiveFeatures": ["f7t"],
"NodeName": "nid001",
"Partitions": [
"part01",
"part02"
],
"State": [
"IDLE"
]
}]
assert valid_client.nodes(machine="cluster1", nodes=["nid001"]) == response


def test_get_nodes_unknown(valid_client):
with pytest.raises(firecrest.FirecrestException):
valid_client.nodes(machine="cluster1", nodes=["nidunknown"])
44 changes: 44 additions & 0 deletions tests/test_compute_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ def fc_server(httpserver):
"/tasks", method="GET"
).respond_with_handler(basic_compute.tasks_handler)

httpserver.expect_request(
"/compute/nodes", method="GET"
).respond_with_handler(basic_compute.nodes_request_handler)

return httpserver


Expand Down Expand Up @@ -407,3 +411,43 @@ async def test_cancel_invalid_machine(valid_client):
async def test_cancel_invalid_client(invalid_client):
with pytest.raises(firecrest.UnauthorizedException):
await invalid_client.cancel(machine="cluster1", job_id=35360071)


@pytest.mark.asyncio
async def test_get_nodes(valid_client):
response = [{
"ActiveFeatures": ["f7t"],
"NodeName": "nid001",
"Partitions": [
"part01",
"part02"
],
"State": [
"IDLE"
]
}]
assert await valid_client.nodes(machine="cluster1") == response


@pytest.mark.asyncio
async def test_get_nodes_from_list(valid_client):
response = [{
"ActiveFeatures": ["f7t"],
"NodeName": "nid001",
"Partitions": [
"part01",
"part02"
],
"State": [
"IDLE"
]
}]
assert await valid_client.nodes(machine="cluster1",
nodes=["nid001"]) == response
ekouts marked this conversation as resolved.
Show resolved Hide resolved


@pytest.mark.asyncio
async def test_get_nodes_unknown(valid_client):
with pytest.raises(firecrest.FirecrestException):
await valid_client.nodes(machine="cluster1",
nodes=["nidunknown"])
ekouts marked this conversation as resolved.
Show resolved Hide resolved
Loading