From 9863a1c94e7a0e3f8ec72995f47afede09294002 Mon Sep 17 00:00:00 2001 From: Rafael Sarmiento Date: Wed, 17 Apr 2024 18:17:29 +0200 Subject: [PATCH 1/7] add node info endpoint --- firecrest/AsyncClient.py | 29 ++++++++ firecrest/BasicClient.py | 31 +++++++++ firecrest/cli/__init__.py | 52 ++++++++++++++- tests/test_compute.py | 128 ++++++++++++++++++++++++++++++++++++ tests/test_compute_async.py | 44 +++++++++++++ 5 files changed, 282 insertions(+), 2 deletions(-) diff --git a/firecrest/AsyncClient.py b/firecrest/AsyncClient.py index ba94362..23f5e7c 100644 --- a/firecrest/AsyncClient.py +++ b/firecrest/AsyncClient.py @@ -1200,6 +1200,7 @@ async def poll( else: return res + async def poll_active( self, machine: str, @@ -1246,6 +1247,34 @@ async def poll_active( return ret + async def get_nodes( + self, + machine: str, + nodes: Optional[Sequence[str]] = None, + ) -> List[t.JobQueue]: + """Retrieves information about the compute nodes. + This call uses the `scontrol show nodes` command. + + :param machine: the machine name where the scheduler belongs to + :param nodes: specific compute nodes to query + :calls: GET `/compute/nodes` + + GET `/tasks/{taskid}` + """ + params = {} + if nodes: + params['nodes'] = ",".join(nodes) + + resp = await self._get_request( + endpoint="/compute/nodes", + additional_headers={"X-Machine-Name": machine}, + params=params, + ) + json_response = self._json_response([resp], 200) + t = ComputeTask(self, json_response["task_id"], [resp]) + dict_result = await t.poll_task("200") + return list(dict_result) + async def cancel(self, machine: str, job_id: str | int) -> str: """Cancels running job. This call uses the `scancel` command. diff --git a/firecrest/BasicClient.py b/firecrest/BasicClient.py index 0ba12b1..5cde29f 100644 --- a/firecrest/BasicClient.py +++ b/firecrest/BasicClient.py @@ -1049,6 +1049,7 @@ def poll( else: return res + def poll_active( self, machine: str, @@ -1082,6 +1083,36 @@ def poll_active( ) return list(dict_result.values()) + def get_nodes( + self, + machine: str, + nodes: Optional[Sequence[str]] = None, + ) -> List[t.JobQueue]: + """Retrieves information about the compute nodes. + This call uses the `scontrol show nodes` command. + + :param machine: the machine name where the scheduler belongs to + :param nodes: specific compute nodes to query + :calls: GET `/compute/nodes` + + GET `/tasks/{taskid}` + """ + params = {} + if nodes: + params['nodes'] = ','.join(nodes) + + resp = self._get_request( + endpoint="/compute/nodes", + additional_headers={"X-Machine-Name": machine}, + params=params, + ) + self._current_method_requests.append(resp) + json_response = self._json_response(self._current_method_requests, 200) + dict_result = self._poll_tasks( + json_response["task_id"], "200", iter([1, 0.5, 0.25]) + ) + return list(dict_result) + def cancel(self, machine: str, job_id: str | int) -> str: """Cancels running job. This call uses the `scancel` command. diff --git a/firecrest/cli/__init__.py b/firecrest/cli/__init__.py index 48e366b..fdf9ff7 100644 --- a/firecrest/cli/__init__.py +++ b/firecrest/cli/__init__.py @@ -1292,6 +1292,54 @@ def poll_active( raise typer.Exit(code=1) +@app.command(rich_help_panel="Compute commands") +def get_nodes( + config_from_parent: str = typer.Option(None, + callback=config_parent_load_callback, + is_eager=True, + hidden=True + ), + system: str = typer.Option( + ..., "-s", "--system", help="The name of the system.", envvar="FIRECREST_SYSTEM" + ), + nodes: Optional[List[str]] = typer.Argument( + None, help="List of specific compute nodes to query." + ), + raw: bool = typer.Option(False, "--raw", help="Print unformatted."), +): + """Retrieves information about the compute nodes. + This call uses the `scontrol show nodes` command + """ + try: + results = client.get_nodes(system, nodes) + if raw: + console.print(result) + else: + parsed_results = [] + for item in results: + parsed_item = {} + for key, value in item.items(): + if isinstance(value, list): + parsed_item[key] = ", ".join(value) + else: + parsed_item[key] = value + + parsed_results.append(parsed_item) + + table = create_table( + "Information about jobs in the queue", + parsed_results, + ("Name", "NodeName"), + ("Partitions", "Partitions"), + ("State", "State"), + ("Active Features", "ActiveFeatures"), + ) + console.print(table) + except Exception as e: + examine_exeption(e) + raise typer.Exit(code=1) + + @app.command(rich_help_panel="Compute commands") def cancel( config_from_parent: str = typer.Option(None, @@ -1312,8 +1360,8 @@ def cancel( raise typer.Exit(code=1) -@reservation_app.command(rich_help_panel="Reservation commands") -def list( +@reservation_app.command(name='list', rich_help_panel="Reservation commands") +def list_command( config_from_parent: str = typer.Option(None, callback=config_parent_load_callback, is_eager=True, diff --git a/tests/test_compute.py b/tests/test_compute.py index 9d9315e..3a2ba3d 100644 --- a/tests/test_compute.py +++ b/tests/test_compute.py @@ -123,6 +123,28 @@ def submit_path_handler(request: Request): ) +def nodes_request_handler(request: Request): + if not request.query_string or request.query_string == b'nodes=nid001': + ret = { + "success": "Task created", + "task_id": "nodes_info", + "task_url": "/tasks/nodes_info" + } + status_code = 200 + + if request.query_string == b'nodes=nidunknown': + ret = { + "success": "Task created", + "task_id": "info_unknown_node", + "task_url": "/tasks/info_unknown_node" + } + status_code = 200 + + return Response( + json.dumps(ret), status=status_code, content_type="application/json" + ) + + def submit_upload_handler(request: Request): if request.headers["Authorization"] != "Bearer VALID_TOKEN": return Response( @@ -845,6 +867,62 @@ def tasks_handler(request: Request): } status_code = 200 + if taskid == "nodes_info": + ret = { + "tasks": { + taskid: { + "created_at": "2024-04-16T09:47:06", + "data": [ + { + "ActiveFeatures": [ + "f7t" + ], + "NodeName": "nid001", + "Partitions": [ + "part01", + "part02", + ], + "State": [ + "IDLE" + ] + } + ], + "description": "Finished successfully", + "hash_id": "nodes_info", + "last_modify": "2024-04-16T09:47:06", + "service": "compute", + "status": "200", + "system": "cluster", + "task_id": "nodes_info", + "task_url": "/tasks/nodes_info", + "updated_at": "2024-04-16T09:47:06", + "user": "service-account-firecrest-sample" + } + } + } + status_code = 200 + + if taskid == "info_unknown_node": + ret = { + "tasks": { + taskid: { + "created_at": "2024-04-16T09:56:14", + "data": "Node nidunknown not found", + "description": "Finished with errors", + "hash_id": "info_unknown_node", + "last_modify": "2024-04-16T09:56:14", + "service": "compute", + "status": "400", + "system": "cluster", + "task_id": "info_unknown_node", + "task_url": "/tasks/info_unknown_node", + "updated_at": "2024-04-16T09:56:14", + "user": "service-account-firecrest-sample" + } + } + } + status_code = 400 + return Response( json.dumps(ret), status=status_code, content_type="application/json" ) @@ -876,6 +954,10 @@ def fc_server(httpserver): "/tasks", method="GET" ).respond_with_handler(tasks_handler) + httpserver.expect_request( + "/compute/nodes", method="GET" + ).respond_with_handler(nodes_request_handler) + return httpserver @@ -1216,6 +1298,17 @@ def test_poll_active(valid_client): ] +def test_cli_get_nodes(valid_credentials): + args = valid_credentials + ["get-nodes", "--system", "cluster1", "nid001"] + result = runner.invoke(cli.app, args=args) + stdout = common.clean_stdout(result.stdout) + assert result.exit_code == 0 + assert "Information about jobs in the queue" in stdout + assert "nid001" in stdout + assert "part01, part02" in stdout + assert "IDLE" in stdout + assert "f7t" in stdout + def test_cli_poll_active(valid_credentials): global queue_retry queue_retry = 0 @@ -1294,3 +1387,38 @@ def test_cancel_invalid_machine(valid_client): def test_cancel_invalid_client(invalid_client): with pytest.raises(firecrest.UnauthorizedException): invalid_client.cancel(machine="cluster1", job_id=35360071) + + +def test_get_nodes(valid_client): + response = [{ + "ActiveFeatures": ["f7t"], + "NodeName": "nid001", + "Partitions": [ + "part01", + "part02" + ], + "State": [ + "IDLE" + ] + }] + assert valid_client.get_nodes(machine="cluster1") == response + + +def test_get_nodes_from_list(valid_client): + response = [{ + "ActiveFeatures": ["f7t"], + "NodeName": "nid001", + "Partitions": [ + "part01", + "part02" + ], + "State": [ + "IDLE" + ] + }] + assert valid_client.get_nodes(machine="cluster1", nodes=["nid001"]) == response + + +def test_get_nodes_unknown(valid_client): + with pytest.raises(firecrest.FirecrestException): + valid_client.get_nodes(machine="cluster1", nodes=["nidunknown"]) diff --git a/tests/test_compute_async.py b/tests/test_compute_async.py index 861bc7f..c3861d8 100644 --- a/tests/test_compute_async.py +++ b/tests/test_compute_async.py @@ -92,6 +92,10 @@ def fc_server(httpserver): "/tasks", method="GET" ).respond_with_handler(basic_compute.tasks_handler) + httpserver.expect_request( + "/compute/nodes", method="GET" + ).respond_with_handler(basic_compute.nodes_request_handler) + return httpserver @@ -407,3 +411,43 @@ async def test_cancel_invalid_machine(valid_client): async def test_cancel_invalid_client(invalid_client): with pytest.raises(firecrest.UnauthorizedException): await invalid_client.cancel(machine="cluster1", job_id=35360071) + + +@pytest.mark.asyncio +async def test_get_nodes(valid_client): + response = [{ + "ActiveFeatures": ["f7t"], + "NodeName": "nid001", + "Partitions": [ + "part01", + "part02" + ], + "State": [ + "IDLE" + ] + }] + assert await valid_client.get_nodes(machine="cluster1") == response + + +@pytest.mark.asyncio +async def test_get_nodes_from_list(valid_client): + response = [{ + "ActiveFeatures": ["f7t"], + "NodeName": "nid001", + "Partitions": [ + "part01", + "part02" + ], + "State": [ + "IDLE" + ] + }] + assert await valid_client.get_nodes(machine="cluster1", + nodes=["nid001"]) == response + + +@pytest.mark.asyncio +async def test_get_nodes_unknown(valid_client): + with pytest.raises(firecrest.FirecrestException): + await valid_client.get_nodes(machine="cluster1", + nodes=["nidunknown"]) From 3f8c1dfbc46f8aac154ca08045b7f9f0d3f0c5ff Mon Sep 17 00:00:00 2001 From: Rafael Sarmiento Date: Wed, 17 Apr 2024 18:20:34 +0200 Subject: [PATCH 2/7] remove newline --- firecrest/AsyncClient.py | 1 - 1 file changed, 1 deletion(-) diff --git a/firecrest/AsyncClient.py b/firecrest/AsyncClient.py index 23f5e7c..765f1da 100644 --- a/firecrest/AsyncClient.py +++ b/firecrest/AsyncClient.py @@ -1200,7 +1200,6 @@ async def poll( else: return res - async def poll_active( self, machine: str, From 871f06dd7915cf20f1ebdeabdb627b70baa04500 Mon Sep 17 00:00:00 2001 From: Rafael Sarmiento Date: Wed, 17 Apr 2024 18:23:47 +0200 Subject: [PATCH 3/7] fix typo --- firecrest/cli/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/firecrest/cli/__init__.py b/firecrest/cli/__init__.py index fdf9ff7..4f40a59 100644 --- a/firecrest/cli/__init__.py +++ b/firecrest/cli/__init__.py @@ -1313,7 +1313,7 @@ def get_nodes( try: results = client.get_nodes(system, nodes) if raw: - console.print(result) + console.print(results:) else: parsed_results = [] for item in results: From 9fed90685be9685fc31721c8a753579abab775d8 Mon Sep 17 00:00:00 2001 From: Rafael Sarmiento Date: Wed, 17 Apr 2024 18:24:40 +0200 Subject: [PATCH 4/7] fix typo --- firecrest/cli/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/firecrest/cli/__init__.py b/firecrest/cli/__init__.py index 4f40a59..fa75629 100644 --- a/firecrest/cli/__init__.py +++ b/firecrest/cli/__init__.py @@ -1313,7 +1313,7 @@ def get_nodes( try: results = client.get_nodes(system, nodes) if raw: - console.print(results:) + console.print(results) else: parsed_results = [] for item in results: From cbbfb1a1c95ac2ad3c5b52e0c8336fc53be80969 Mon Sep 17 00:00:00 2001 From: Rafael Sarmiento Date: Wed, 17 Apr 2024 18:32:08 +0200 Subject: [PATCH 5/7] fix type --- firecrest/cli/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/firecrest/cli/__init__.py b/firecrest/cli/__init__.py index fa75629..6af4d27 100644 --- a/firecrest/cli/__init__.py +++ b/firecrest/cli/__init__.py @@ -1322,7 +1322,7 @@ def get_nodes( if isinstance(value, list): parsed_item[key] = ", ".join(value) else: - parsed_item[key] = value + parsed_item[key] = str(value) parsed_results.append(parsed_item) From 7a96dc9908c02c2917c57271680a043c94dfab92 Mon Sep 17 00:00:00 2001 From: Rafael Sarmiento Date: Tue, 23 Apr 2024 14:07:24 +0200 Subject: [PATCH 6/7] fix comments --- firecrest/AsyncClient.py | 10 ++++++---- firecrest/BasicClient.py | 11 ++++++----- firecrest/cli/__init__.py | 2 +- tests/test_compute.py | 6 +++--- tests/test_compute_async.py | 6 +++--- 5 files changed, 19 insertions(+), 16 deletions(-) diff --git a/firecrest/AsyncClient.py b/firecrest/AsyncClient.py index 765f1da..2f1db9b 100644 --- a/firecrest/AsyncClient.py +++ b/firecrest/AsyncClient.py @@ -1246,7 +1246,7 @@ async def poll_active( return ret - async def get_nodes( + async def nodes( self, machine: str, nodes: Optional[Sequence[str]] = None, @@ -1259,10 +1259,12 @@ async def get_nodes( :calls: GET `/compute/nodes` GET `/tasks/{taskid}` + + .. warning:: This is available only for FirecREST>=1.16.0 """ params = {} if nodes: - params['nodes'] = ",".join(nodes) + params["nodes"] = ",".join(nodes) resp = await self._get_request( endpoint="/compute/nodes", @@ -1271,8 +1273,8 @@ async def get_nodes( ) json_response = self._json_response([resp], 200) t = ComputeTask(self, json_response["task_id"], [resp]) - dict_result = await t.poll_task("200") - return list(dict_result) + result = await t.poll_task("200") + return result async def cancel(self, machine: str, job_id: str | int) -> str: """Cancels running job. diff --git a/firecrest/BasicClient.py b/firecrest/BasicClient.py index 5cde29f..949ed15 100644 --- a/firecrest/BasicClient.py +++ b/firecrest/BasicClient.py @@ -1049,7 +1049,6 @@ def poll( else: return res - def poll_active( self, machine: str, @@ -1083,7 +1082,7 @@ def poll_active( ) return list(dict_result.values()) - def get_nodes( + def nodes( self, machine: str, nodes: Optional[Sequence[str]] = None, @@ -1096,10 +1095,12 @@ def get_nodes( :calls: GET `/compute/nodes` GET `/tasks/{taskid}` + + .. warning:: This is available only for FirecREST>=1.16.0 """ params = {} if nodes: - params['nodes'] = ','.join(nodes) + params["nodes"] = ",".join(nodes) resp = self._get_request( endpoint="/compute/nodes", @@ -1108,10 +1109,10 @@ def get_nodes( ) self._current_method_requests.append(resp) json_response = self._json_response(self._current_method_requests, 200) - dict_result = self._poll_tasks( + result = self._poll_tasks( json_response["task_id"], "200", iter([1, 0.5, 0.25]) ) - return list(dict_result) + return result def cancel(self, machine: str, job_id: str | int) -> str: """Cancels running job. diff --git a/firecrest/cli/__init__.py b/firecrest/cli/__init__.py index 6af4d27..3305b65 100644 --- a/firecrest/cli/__init__.py +++ b/firecrest/cli/__init__.py @@ -1311,7 +1311,7 @@ def get_nodes( This call uses the `scontrol show nodes` command """ try: - results = client.get_nodes(system, nodes) + results = client.nodes(system, nodes) if raw: console.print(results) else: diff --git a/tests/test_compute.py b/tests/test_compute.py index 3a2ba3d..cbcfb1b 100644 --- a/tests/test_compute.py +++ b/tests/test_compute.py @@ -1401,7 +1401,7 @@ def test_get_nodes(valid_client): "IDLE" ] }] - assert valid_client.get_nodes(machine="cluster1") == response + assert valid_client.nodes(machine="cluster1") == response def test_get_nodes_from_list(valid_client): @@ -1416,9 +1416,9 @@ def test_get_nodes_from_list(valid_client): "IDLE" ] }] - assert valid_client.get_nodes(machine="cluster1", nodes=["nid001"]) == response + assert valid_client.nodes(machine="cluster1", nodes=["nid001"]) == response def test_get_nodes_unknown(valid_client): with pytest.raises(firecrest.FirecrestException): - valid_client.get_nodes(machine="cluster1", nodes=["nidunknown"]) + valid_client.nodes(machine="cluster1", nodes=["nidunknown"]) diff --git a/tests/test_compute_async.py b/tests/test_compute_async.py index c3861d8..6cb2e1a 100644 --- a/tests/test_compute_async.py +++ b/tests/test_compute_async.py @@ -426,7 +426,7 @@ async def test_get_nodes(valid_client): "IDLE" ] }] - assert await valid_client.get_nodes(machine="cluster1") == response + assert await valid_client.nodes(machine="cluster1") == response @pytest.mark.asyncio @@ -442,12 +442,12 @@ async def test_get_nodes_from_list(valid_client): "IDLE" ] }] - assert await valid_client.get_nodes(machine="cluster1", + assert await valid_client.nodes(machine="cluster1", nodes=["nid001"]) == response @pytest.mark.asyncio async def test_get_nodes_unknown(valid_client): with pytest.raises(firecrest.FirecrestException): - await valid_client.get_nodes(machine="cluster1", + await valid_client.nodes(machine="cluster1", nodes=["nidunknown"]) From ea9cdd00e9755301ef75d9860754a78f1ebfc7c3 Mon Sep 17 00:00:00 2001 From: Rafael Sarmiento Date: Wed, 24 Apr 2024 09:18:39 +0200 Subject: [PATCH 7/7] fix comments --- tests/test_compute_async.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_compute_async.py b/tests/test_compute_async.py index 6cb2e1a..c40b681 100644 --- a/tests/test_compute_async.py +++ b/tests/test_compute_async.py @@ -443,11 +443,11 @@ async def test_get_nodes_from_list(valid_client): ] }] assert await valid_client.nodes(machine="cluster1", - nodes=["nid001"]) == response + nodes=["nid001"]) == response @pytest.mark.asyncio async def test_get_nodes_unknown(valid_client): with pytest.raises(firecrest.FirecrestException): await valid_client.nodes(machine="cluster1", - nodes=["nidunknown"]) + nodes=["nidunknown"])