diff --git a/firecrest/AsyncClient.py b/firecrest/AsyncClient.py index ba94362..2f1db9b 100644 --- a/firecrest/AsyncClient.py +++ b/firecrest/AsyncClient.py @@ -1246,6 +1246,36 @@ async def poll_active( return ret + async def nodes( + self, + machine: str, + nodes: Optional[Sequence[str]] = None, + ) -> List[t.JobQueue]: + """Retrieves information about the compute nodes. + This call uses the `scontrol show nodes` command. + + :param machine: the machine name where the scheduler belongs to + :param nodes: specific compute nodes to query + :calls: GET `/compute/nodes` + + GET `/tasks/{taskid}` + + .. warning:: This is available only for FirecREST>=1.16.0 + """ + params = {} + if nodes: + params["nodes"] = ",".join(nodes) + + resp = await self._get_request( + endpoint="/compute/nodes", + additional_headers={"X-Machine-Name": machine}, + params=params, + ) + json_response = self._json_response([resp], 200) + t = ComputeTask(self, json_response["task_id"], [resp]) + result = await t.poll_task("200") + return result + async def cancel(self, machine: str, job_id: str | int) -> str: """Cancels running job. This call uses the `scancel` command. diff --git a/firecrest/BasicClient.py b/firecrest/BasicClient.py index 0ba12b1..949ed15 100644 --- a/firecrest/BasicClient.py +++ b/firecrest/BasicClient.py @@ -1082,6 +1082,38 @@ def poll_active( ) return list(dict_result.values()) + def nodes( + self, + machine: str, + nodes: Optional[Sequence[str]] = None, + ) -> List[t.JobQueue]: + """Retrieves information about the compute nodes. + This call uses the `scontrol show nodes` command. + + :param machine: the machine name where the scheduler belongs to + :param nodes: specific compute nodes to query + :calls: GET `/compute/nodes` + + GET `/tasks/{taskid}` + + .. warning:: This is available only for FirecREST>=1.16.0 + """ + params = {} + if nodes: + params["nodes"] = ",".join(nodes) + + resp = self._get_request( + endpoint="/compute/nodes", + additional_headers={"X-Machine-Name": machine}, + params=params, + ) + self._current_method_requests.append(resp) + json_response = self._json_response(self._current_method_requests, 200) + result = self._poll_tasks( + json_response["task_id"], "200", iter([1, 0.5, 0.25]) + ) + return result + def cancel(self, machine: str, job_id: str | int) -> str: """Cancels running job. This call uses the `scancel` command. diff --git a/firecrest/cli/__init__.py b/firecrest/cli/__init__.py index 48e366b..3305b65 100644 --- a/firecrest/cli/__init__.py +++ b/firecrest/cli/__init__.py @@ -1292,6 +1292,54 @@ def poll_active( raise typer.Exit(code=1) +@app.command(rich_help_panel="Compute commands") +def get_nodes( + config_from_parent: str = typer.Option(None, + callback=config_parent_load_callback, + is_eager=True, + hidden=True + ), + system: str = typer.Option( + ..., "-s", "--system", help="The name of the system.", envvar="FIRECREST_SYSTEM" + ), + nodes: Optional[List[str]] = typer.Argument( + None, help="List of specific compute nodes to query." + ), + raw: bool = typer.Option(False, "--raw", help="Print unformatted."), +): + """Retrieves information about the compute nodes. + This call uses the `scontrol show nodes` command + """ + try: + results = client.nodes(system, nodes) + if raw: + console.print(results) + else: + parsed_results = [] + for item in results: + parsed_item = {} + for key, value in item.items(): + if isinstance(value, list): + parsed_item[key] = ", ".join(value) + else: + parsed_item[key] = str(value) + + parsed_results.append(parsed_item) + + table = create_table( + "Information about jobs in the queue", + parsed_results, + ("Name", "NodeName"), + ("Partitions", "Partitions"), + ("State", "State"), + ("Active Features", "ActiveFeatures"), + ) + console.print(table) + except Exception as e: + examine_exeption(e) + raise typer.Exit(code=1) + + @app.command(rich_help_panel="Compute commands") def cancel( config_from_parent: str = typer.Option(None, @@ -1312,8 +1360,8 @@ def cancel( raise typer.Exit(code=1) -@reservation_app.command(rich_help_panel="Reservation commands") -def list( +@reservation_app.command(name='list', rich_help_panel="Reservation commands") +def list_command( config_from_parent: str = typer.Option(None, callback=config_parent_load_callback, is_eager=True, diff --git a/tests/test_compute.py b/tests/test_compute.py index 9d9315e..cbcfb1b 100644 --- a/tests/test_compute.py +++ b/tests/test_compute.py @@ -123,6 +123,28 @@ def submit_path_handler(request: Request): ) +def nodes_request_handler(request: Request): + if not request.query_string or request.query_string == b'nodes=nid001': + ret = { + "success": "Task created", + "task_id": "nodes_info", + "task_url": "/tasks/nodes_info" + } + status_code = 200 + + if request.query_string == b'nodes=nidunknown': + ret = { + "success": "Task created", + "task_id": "info_unknown_node", + "task_url": "/tasks/info_unknown_node" + } + status_code = 200 + + return Response( + json.dumps(ret), status=status_code, content_type="application/json" + ) + + def submit_upload_handler(request: Request): if request.headers["Authorization"] != "Bearer VALID_TOKEN": return Response( @@ -845,6 +867,62 @@ def tasks_handler(request: Request): } status_code = 200 + if taskid == "nodes_info": + ret = { + "tasks": { + taskid: { + "created_at": "2024-04-16T09:47:06", + "data": [ + { + "ActiveFeatures": [ + "f7t" + ], + "NodeName": "nid001", + "Partitions": [ + "part01", + "part02", + ], + "State": [ + "IDLE" + ] + } + ], + "description": "Finished successfully", + "hash_id": "nodes_info", + "last_modify": "2024-04-16T09:47:06", + "service": "compute", + "status": "200", + "system": "cluster", + "task_id": "nodes_info", + "task_url": "/tasks/nodes_info", + "updated_at": "2024-04-16T09:47:06", + "user": "service-account-firecrest-sample" + } + } + } + status_code = 200 + + if taskid == "info_unknown_node": + ret = { + "tasks": { + taskid: { + "created_at": "2024-04-16T09:56:14", + "data": "Node nidunknown not found", + "description": "Finished with errors", + "hash_id": "info_unknown_node", + "last_modify": "2024-04-16T09:56:14", + "service": "compute", + "status": "400", + "system": "cluster", + "task_id": "info_unknown_node", + "task_url": "/tasks/info_unknown_node", + "updated_at": "2024-04-16T09:56:14", + "user": "service-account-firecrest-sample" + } + } + } + status_code = 400 + return Response( json.dumps(ret), status=status_code, content_type="application/json" ) @@ -876,6 +954,10 @@ def fc_server(httpserver): "/tasks", method="GET" ).respond_with_handler(tasks_handler) + httpserver.expect_request( + "/compute/nodes", method="GET" + ).respond_with_handler(nodes_request_handler) + return httpserver @@ -1216,6 +1298,17 @@ def test_poll_active(valid_client): ] +def test_cli_get_nodes(valid_credentials): + args = valid_credentials + ["get-nodes", "--system", "cluster1", "nid001"] + result = runner.invoke(cli.app, args=args) + stdout = common.clean_stdout(result.stdout) + assert result.exit_code == 0 + assert "Information about jobs in the queue" in stdout + assert "nid001" in stdout + assert "part01, part02" in stdout + assert "IDLE" in stdout + assert "f7t" in stdout + def test_cli_poll_active(valid_credentials): global queue_retry queue_retry = 0 @@ -1294,3 +1387,38 @@ def test_cancel_invalid_machine(valid_client): def test_cancel_invalid_client(invalid_client): with pytest.raises(firecrest.UnauthorizedException): invalid_client.cancel(machine="cluster1", job_id=35360071) + + +def test_get_nodes(valid_client): + response = [{ + "ActiveFeatures": ["f7t"], + "NodeName": "nid001", + "Partitions": [ + "part01", + "part02" + ], + "State": [ + "IDLE" + ] + }] + assert valid_client.nodes(machine="cluster1") == response + + +def test_get_nodes_from_list(valid_client): + response = [{ + "ActiveFeatures": ["f7t"], + "NodeName": "nid001", + "Partitions": [ + "part01", + "part02" + ], + "State": [ + "IDLE" + ] + }] + assert valid_client.nodes(machine="cluster1", nodes=["nid001"]) == response + + +def test_get_nodes_unknown(valid_client): + with pytest.raises(firecrest.FirecrestException): + valid_client.nodes(machine="cluster1", nodes=["nidunknown"]) diff --git a/tests/test_compute_async.py b/tests/test_compute_async.py index 861bc7f..c40b681 100644 --- a/tests/test_compute_async.py +++ b/tests/test_compute_async.py @@ -92,6 +92,10 @@ def fc_server(httpserver): "/tasks", method="GET" ).respond_with_handler(basic_compute.tasks_handler) + httpserver.expect_request( + "/compute/nodes", method="GET" + ).respond_with_handler(basic_compute.nodes_request_handler) + return httpserver @@ -407,3 +411,43 @@ async def test_cancel_invalid_machine(valid_client): async def test_cancel_invalid_client(invalid_client): with pytest.raises(firecrest.UnauthorizedException): await invalid_client.cancel(machine="cluster1", job_id=35360071) + + +@pytest.mark.asyncio +async def test_get_nodes(valid_client): + response = [{ + "ActiveFeatures": ["f7t"], + "NodeName": "nid001", + "Partitions": [ + "part01", + "part02" + ], + "State": [ + "IDLE" + ] + }] + assert await valid_client.nodes(machine="cluster1") == response + + +@pytest.mark.asyncio +async def test_get_nodes_from_list(valid_client): + response = [{ + "ActiveFeatures": ["f7t"], + "NodeName": "nid001", + "Partitions": [ + "part01", + "part02" + ], + "State": [ + "IDLE" + ] + }] + assert await valid_client.nodes(machine="cluster1", + nodes=["nid001"]) == response + + +@pytest.mark.asyncio +async def test_get_nodes_unknown(valid_client): + with pytest.raises(firecrest.FirecrestException): + await valid_client.nodes(machine="cluster1", + nodes=["nidunknown"])