Skip to content

Commit

Permalink
reverse code back to yesterday
Browse files Browse the repository at this point in the history
  • Loading branch information
MehmedGIT committed Mar 6, 2024
1 parent 1c0012e commit 6349738
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 17 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/ci_cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,12 @@ jobs:
run: docker compose -f ./docker-compose.yml --env-file ${{ env.ENV_FILE }} up -d operandi-mongodb

- name: wait starting of RabbitMQ and MongoDB
run: sleep 25
run: sleep 45

- name: run utils tests
run: |
export $(shell sed 's/=.*//' ${{ env.ENV_FILE }})
pytest tests/tests_utils/test_*.py
- name: run broker tests
run: |
Expand All @@ -138,11 +143,6 @@ jobs:
export $(shell sed 's/=.*//' ${{ env.ENV_FILE }})
pytest tests/tests_harvester/test_*.py
- name: run utils tests
run: |
export $(shell sed 's/=.*//' ${{ env.ENV_FILE }})
pytest tests/tests_utils/test_*.py
- name: run integration tests
run: |
export $(shell sed 's/=.*//' ${{ env.ENV_FILE }})
Expand Down
5 changes: 5 additions & 0 deletions src/broker/operandi_broker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ def prepare_and_trigger_slurm_job(
# The deadline of the regular jobs - 48 hours
job_deadline_time = "48:00:00"

# Recreate the transfer connection for each workflow job submission
# This is required due to all kind of nasty connection fails - timeouts,
# paramiko transport not reporting properly, etc.
self.hpc_io_transfer = HPCTransfer()
self.log.info("HPC transfer connection renewed successfully.")
hpc_batch_script_path = self.hpc_io_transfer.put_batch_script(batch_script_id="submit_workflow_job.sh")

try:
Expand Down
12 changes: 10 additions & 2 deletions src/utils/operandi_utils/hpc/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,17 @@ def create_sftp_client(self) -> SFTPClient:
def is_transport_responsive(transport: Transport) -> bool:
if not transport:
return False
if not transport.is_authenticated():
return False
if not transport.is_active():
return False
if not transport.is_alive():
return False
try:
# Sometimes is_active() returns false-positives, hence the extra check
transport.send_ignore()
# Nevertheless this still returns false-positives...!!!
# https://github.com/paramiko/paramiko/issues/2026
return True
except EOFError:
return False
Expand Down Expand Up @@ -207,6 +213,7 @@ def reconnect_if_required(
hpc_host = self.last_used_hpc_host
if not proxy_host:
proxy_host = self.last_used_proxy_host

if not self.is_ssh_connection_still_responsive(self.ssh_proxy_client):
self.log.warning("The connection to proxy server is not responsive, trying to open a new connection")
self.ssh_proxy_client = self.connect_to_proxy_server(host=proxy_host, port=proxy_port)
Expand All @@ -232,8 +239,9 @@ def recreate_sftp_if_required(
hpc_host=hpc_host, hpc_port=hpc_port,
proxy_host=proxy_host, proxy_port=proxy_port
)
self.log.warning("Creating a new SFTP client")
self.create_sftp_client()
if not self.is_sftp_still_responsive():
self.log.warning("The SFTP client is not responsive, trying to create a new SFTP client")
self.create_sftp_client()

def create_ssh_connection_to_hpc_by_iteration(self, try_times: int = 1) -> None:
while try_times > 0:
Expand Down
24 changes: 16 additions & 8 deletions src/utils/operandi_utils/hpc/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,22 @@ def trigger_slurm_job(
command += f" {ws_pages_amount}"
command += "'"

self.log.info(f"About to execute a blocking command: {command}")
output, err, return_code = self.execute_blocking(command)
self.log.info(f"Command output: {output}")
self.log.info(f"Command err: {err}")
self.log.info(f"Command return code: {return_code}")
slurm_job_id = output[0].strip('\n').split(' ')[-1]
self.log.info(f"Slurm job id: {slurm_job_id}")
assert int(slurm_job_id)
slurm_job_id = "UNASSIGNED"
try_times = 3
while try_times > 0:
try:
self.log.info(f"About to execute a blocking command: {command}")
output, err, return_code = self.execute_blocking(command)
self.log.info(f"Command output: {output}")
self.log.info(f"Command err: {err}")
self.log.info(f"Command return code: {return_code}")
slurm_job_id = output[0].strip('\n').split(' ')[-1]
self.log.info(f"Slurm job id: {slurm_job_id}")
assert int(slurm_job_id)
break
except Exception:
try_times = try_times - 1
continue
return slurm_job_id

def check_slurm_job_state(self, slurm_job_id: str, tries: int = 3, wait_time: int = 2) -> str:
Expand Down
2 changes: 1 addition & 1 deletion src/utils/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setup(
name='operandi_utils',
version='2.10.2',
version='2.10.3',
description='OPERANDI - Utils',
long_description=open('README.md').read(),
long_description_content_type='text/markdown',
Expand Down

0 comments on commit 6349738

Please sign in to comment.