From 63497381cb343d0d4e4b99217296a1c47e88b355 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Wed, 6 Mar 2024 12:28:09 +0100 Subject: [PATCH] reverse code back to yesterday --- .github/workflows/ci_cd.yml | 12 ++++++------ src/broker/operandi_broker/worker.py | 5 +++++ src/utils/operandi_utils/hpc/connector.py | 12 ++++++++++-- src/utils/operandi_utils/hpc/executor.py | 24 +++++++++++++++-------- src/utils/setup.py | 2 +- 5 files changed, 38 insertions(+), 17 deletions(-) diff --git a/.github/workflows/ci_cd.yml b/.github/workflows/ci_cd.yml index af298b9e..bd7a7fa8 100644 --- a/.github/workflows/ci_cd.yml +++ b/.github/workflows/ci_cd.yml @@ -121,7 +121,12 @@ jobs: run: docker compose -f ./docker-compose.yml --env-file ${{ env.ENV_FILE }} up -d operandi-mongodb - name: wait starting of RabbitMQ and MongoDB - run: sleep 25 + run: sleep 45 + + - name: run utils tests + run: | + export $(shell sed 's/=.*//' ${{ env.ENV_FILE }}) + pytest tests/tests_utils/test_*.py - name: run broker tests run: | @@ -138,11 +143,6 @@ jobs: export $(shell sed 's/=.*//' ${{ env.ENV_FILE }}) pytest tests/tests_harvester/test_*.py - - name: run utils tests - run: | - export $(shell sed 's/=.*//' ${{ env.ENV_FILE }}) - pytest tests/tests_utils/test_*.py - - name: run integration tests run: | export $(shell sed 's/=.*//' ${{ env.ENV_FILE }}) diff --git a/src/broker/operandi_broker/worker.py b/src/broker/operandi_broker/worker.py index 53b43431..150f2102 100644 --- a/src/broker/operandi_broker/worker.py +++ b/src/broker/operandi_broker/worker.py @@ -206,6 +206,11 @@ def prepare_and_trigger_slurm_job( # The deadline of the regular jobs - 48 hours job_deadline_time = "48:00:00" + # Recreate the transfer connection for each workflow job submission + # This is required due to all kind of nasty connection fails - timeouts, + # paramiko transport not reporting properly, etc. + self.hpc_io_transfer = HPCTransfer() + self.log.info("HPC transfer connection renewed successfully.") hpc_batch_script_path = self.hpc_io_transfer.put_batch_script(batch_script_id="submit_workflow_job.sh") try: diff --git a/src/utils/operandi_utils/hpc/connector.py b/src/utils/operandi_utils/hpc/connector.py index 669d8dd5..c0b0fc6d 100644 --- a/src/utils/operandi_utils/hpc/connector.py +++ b/src/utils/operandi_utils/hpc/connector.py @@ -163,11 +163,17 @@ def create_sftp_client(self) -> SFTPClient: def is_transport_responsive(transport: Transport) -> bool: if not transport: return False + if not transport.is_authenticated(): + return False if not transport.is_active(): return False + if not transport.is_alive(): + return False try: # Sometimes is_active() returns false-positives, hence the extra check transport.send_ignore() + # Nevertheless this still returns false-positives...!!! + # https://github.com/paramiko/paramiko/issues/2026 return True except EOFError: return False @@ -207,6 +213,7 @@ def reconnect_if_required( hpc_host = self.last_used_hpc_host if not proxy_host: proxy_host = self.last_used_proxy_host + if not self.is_ssh_connection_still_responsive(self.ssh_proxy_client): self.log.warning("The connection to proxy server is not responsive, trying to open a new connection") self.ssh_proxy_client = self.connect_to_proxy_server(host=proxy_host, port=proxy_port) @@ -232,8 +239,9 @@ def recreate_sftp_if_required( hpc_host=hpc_host, hpc_port=hpc_port, proxy_host=proxy_host, proxy_port=proxy_port ) - self.log.warning("Creating a new SFTP client") - self.create_sftp_client() + if not self.is_sftp_still_responsive(): + self.log.warning("The SFTP client is not responsive, trying to create a new SFTP client") + self.create_sftp_client() def create_ssh_connection_to_hpc_by_iteration(self, try_times: int = 1) -> None: while try_times > 0: diff --git a/src/utils/operandi_utils/hpc/executor.py b/src/utils/operandi_utils/hpc/executor.py index c8a2ff5f..dd005d8c 100644 --- a/src/utils/operandi_utils/hpc/executor.py +++ b/src/utils/operandi_utils/hpc/executor.py @@ -102,14 +102,22 @@ def trigger_slurm_job( command += f" {ws_pages_amount}" command += "'" - self.log.info(f"About to execute a blocking command: {command}") - output, err, return_code = self.execute_blocking(command) - self.log.info(f"Command output: {output}") - self.log.info(f"Command err: {err}") - self.log.info(f"Command return code: {return_code}") - slurm_job_id = output[0].strip('\n').split(' ')[-1] - self.log.info(f"Slurm job id: {slurm_job_id}") - assert int(slurm_job_id) + slurm_job_id = "UNASSIGNED" + try_times = 3 + while try_times > 0: + try: + self.log.info(f"About to execute a blocking command: {command}") + output, err, return_code = self.execute_blocking(command) + self.log.info(f"Command output: {output}") + self.log.info(f"Command err: {err}") + self.log.info(f"Command return code: {return_code}") + slurm_job_id = output[0].strip('\n').split(' ')[-1] + self.log.info(f"Slurm job id: {slurm_job_id}") + assert int(slurm_job_id) + break + except Exception: + try_times = try_times - 1 + continue return slurm_job_id def check_slurm_job_state(self, slurm_job_id: str, tries: int = 3, wait_time: int = 2) -> str: diff --git a/src/utils/setup.py b/src/utils/setup.py index bfdaa9e7..d1df8b39 100644 --- a/src/utils/setup.py +++ b/src/utils/setup.py @@ -5,7 +5,7 @@ setup( name='operandi_utils', - version='2.10.2', + version='2.10.3', description='OPERANDI - Utils', long_description=open('README.md').read(), long_description_content_type='text/markdown',