Skip to content

Commit

Permalink
[pycue] Add interactive functions to reboot hosts using the api (Acad…
Browse files Browse the repository at this point in the history
…emySoftwareFoundation#1419)

* Add interactive functions to reboot hosts using the api

New api functions can be used to interactively reboot hosts on the farm and monitor their state. New functions:
 * `rebootFarmSafely`: uses the hostSearch module to find hosts and requests a reboot for each of them in groups defined by the arg `group_size`. If `start_time` is provided the function will only target hosts that have `boot_time` > `start_time`
 * `monitorRebootFarm` Similar to rebootFarmSafely, but instead of requesting a reboot, it continuosly monitors reboot groups and only finishes when all the hosts on the query have been rebooted.

* Update pycue/opencue/wrappers/host.py

Signed-off-by: Diego Tavares <dtavares@imageworks.com>

---------

Signed-off-by: Diego Tavares <dtavares@imageworks.com>
  • Loading branch information
DiegoTavares authored and n-jay committed Jul 26, 2024
1 parent 723f03a commit da7104f
Showing 1 changed file with 174 additions and 0 deletions.
174 changes: 174 additions & 0 deletions pycue/opencue/wrappers/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
import enum
import os
import time
from builtins import input
import grpc

from opencue import Cuebot
from opencue import util
from opencue import search
from opencue.compiled_proto import comment_pb2
from opencue.compiled_proto import host_pb2
import opencue.wrappers.comment
Expand Down Expand Up @@ -217,6 +220,177 @@ def setThreadMode(self, mode):
self.stub.SetThreadMode(host_pb2.HostSetThreadModeRequest(host=self.data, mode=mode),
timeout=Cuebot.Timeout)

@staticmethod
def hasHostRebootedSince(host, start_time):
"""
Returns whether a host has rebooted since `start_time` or is in process of
rebooting (`state = REBOOT_WHEN_IDLE`)
:param host: Host wrapper
:param start_time: epoch time
:return: True if host booted after start_time or is rebooting
"""
return (host.state() == host_pb2.HardwareState.Value('REBOOT_WHEN_IDLE') or
(host.state() == host_pb2.HardwareState.Value('UP') and
host.bootTime() < start_time))

@staticmethod
def rebootFarmSafely(group_size, start_time=None, **options):
"""Requests an idle reboot for nodes found using the
options search criteria. For safety, workstations are always excluded.
Method can only be called interactively
For example::
from opencue.wrappers.host import Host
Host.rebootFarmSafely(5, 1, alloc=["lax.ngp"])
Uses the hostSearch module to find hosts and requests a reboot for each
of them in groups defined by the arg `group_size`. If `start_time` is
provided the function will only target hosts that have
`boot_time` > `start_time`.
Possible hostSearch args:
- host: host names - list
- match: host name substring match - str
- regex: a host name search by regular expression - str
- id: a search by unique id - str
- alloc: search by allocation. - list
:param group_size: Reboot hosts in groups limited to this size
:param start_time: if not none, only hosts with boot_time<start_time will be targeted
:param options: HostSearch params
"""
hosts = search.HostSearch.byOptions(**options)
# Workstations are marked as Nimby, ignore those.
hosts = [host for host in hosts if not host.isNimbyEnabled() and host.isUp()]
check_hosts_interval_seconds = 30

if len(hosts) == 0:
print("No hosts found")
return

print("Rebooting hosts:\n%s.\n\n"
"Are you sure hosts on this list are safe to be rebooted? [Y/n]" %
[str(host.name()) for host in hosts])
while True:
choice = input()
if choice == "Y":
break
if choice == "n":
return

groups = [hosts[x:x+group_size] for x in range(0, len(hosts), group_size)]

host_still_rebooting = []

if start_time is None:
start_time = int(time.time())
for group in groups:
group_host_names = []
for host in group:
if host.bootTime() < start_time:
try:
host.rebootWhenIdle()
except grpc.RpcError as rpc_error:
# pylint: disable=no-member
if rpc_error.code() in [grpc.StatusCode.UNAVAILABLE,
grpc.StatusCode.CANCELLED]:
# Wait and Retry
time.sleep(5)
host.rebootWhenIdle()
else:
raise rpc_error
# pylint: enable=no-member

group_host_names.append(str(host.name()))
print("Requesting reboot for %s. Waiting for completion." % group_host_names)

# Wait until 80% of a group gets upgraded to jump to the next group
while True:
try:
hosts = search.HostSearch.byName(group_host_names)
rebooting_hosts = [str(host.name()) for host in hosts
if Host.hasHostRebootedSince(host, start_time)]
if len(rebooting_hosts) <= len(hosts) * 0.2:
host_still_rebooting.extend(rebooting_hosts)
print("Moving to the next group, left behind %s..\n" % rebooting_hosts)
break
print("Still waiting on %s.." % rebooting_hosts)
time.sleep(check_hosts_interval_seconds)
except grpc.RpcError as rpc_error:
# Ignore rpc unavailable to survive short service outages on the server side
# pylint: disable=no-member
if rpc_error.code() in [grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.CANCELLED]:
continue
# pylint: enable=no-member

# Wait for remaining hosts to finish (20% not awaited for)
if host_still_rebooting:
print("Waiting on remaining hosts to reboot")
while True:
try:
hosts = search.HostSearch.byName(host_still_rebooting)
rebooting_hosts = [str(host.name()) for host in hosts
if Host.hasHostRebootedSince(host, start_time)]
if len(rebooting_hosts) == 0:
break
host_still_rebooting = rebooting_hosts
print("Still waiting on %s.." % rebooting_hosts)
time.sleep(check_hosts_interval_seconds)
except grpc.RpcError as rpc_error:
# Ignore rpc unavailable to survive short service outages on the server side
# pylint: disable=no-member
if rpc_error.code() in [grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.CANCELLED]:
continue
# pylint: enable=no-member
print("Finished rebooting requested hosts")

@staticmethod
def monitorRebootFarm(start_time, **options):
"""Monitor hosts rebooting
Possible args:
- host: host names - list
- match: host name substring match - str
- regex: a host name search by regular expression - str
- id: a search by unique id - str
- alloc: search by allocation. - list
:param start_time: if not none only hosts with boot_time<start_time will be targeted
:param options: HostSearch params
"""
hosts = search.HostSearch.byOptions(**options)
# Workstations are marked as Nimby, ignore those.
hosts_names = [host.name() for host in hosts if not host.isNimbyEnabled() and host.isUp()]
check_hosts_interval_seconds = 30

if len(hosts) == 0:
print("No hosts found")
return

if start_time is None:
start_time = int(time.time())

# Wait until 80% of a group gets upgraded to jump to the next group
while True:
try:
hosts = search.HostSearch.byName(hosts_names)
rebooting_hosts = [str(host.name()) for host in hosts
if Host.hasHostRebootedSince(host, start_time)]
if len(rebooting_hosts) == 0:
break
print("Still waiting on %s hosts: %s.." % (len(rebooting_hosts), rebooting_hosts))
time.sleep(check_hosts_interval_seconds)
except grpc.RpcError as rpc_error:
# Ignore rpc unavailable to survive short service outages on the server side
# pylint: disable=no-member
if rpc_error.code() in [grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.CANCELLED]:
continue
# pylint: enable=no-member

print("Finished rebooting hosts")

def id(self):
"""Returns the id of the host.
Expand Down

0 comments on commit da7104f

Please sign in to comment.