Skip to content

Commit

Permalink
[EXPERIMENT] Rqd containerized frame (#1546)
Browse files Browse the repository at this point in the history
Signed-off-by: Diego Tavares <dtavares@imageworks.com>
  • Loading branch information
DiegoTavares authored Oct 17, 2024
1 parent 7582d1a commit 4e365d4
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 48 deletions.
8 changes: 2 additions & 6 deletions rqd/rqd.example.conf
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,5 @@ DOCKER_IMAGE=""
RUN_ON_DOCKER=False

[docker.mounts]
MCP="type=bind,source=/mcp,target=/mcp,bind-propagation=slave"
NET="type=bind,source=/net,target=/net,bind-propagation=slave"
TMP="type=bind,source=/tmp,target=/tmp,bind-propagation=slave"
SCRATCH="type=bind,source=/scratch,target=/scratch,bind-propagation=slave"
LIMITS="type=bind,source=/etc/security/limits.d/,target=/etc/security/limits.d/,bind-propagation=slave"
FUSE="type=bind,source=/dev/fuse,target=/dev/fuse,bind-propagation=shared"
TEMP=type:bind,source:/tmp,target:/tmp,bind-propagation:slave
NET=type:bind,source:/net,target:/net,bind-propagation:slave
28 changes: 18 additions & 10 deletions rqd/rqd/rqconstants.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@
import docker.models
import docker.types

# rqd needs to run as root to be able to run docker
RQD_UID = 0
RQD_GID = 0

def parse_mount(mount_str):
"""
Parse mount definitions similar to a docker run command into a docker
Expand All @@ -255,21 +259,25 @@ def parse_mount(mount_str):
# bind-propagation defaults to None as only type=bind accepts it
mount_dict["bind-propagation"] = None
for item in mount_str.split(","):
key, value = item.split("=")
mount_dic[key.strip()] = value.strip()
return mount_dic
key, value = item.split(":")
mount_dict[key.strip()] = value.strip()
return mount_dict

DOCKER_IMAGE = config.get(__docker_config, "DOCKER_IMAGE")
# Parse values under the category docker.mounts into Mount objects
mounts = config.options(__docker_mounts)
for mount_name in mounts:
mount_str = config.get(__docker_mounts, mount_name)
mount_dic = parse_mount(mount_str)
mount = docker.types.Mount(mount_dic["target"],
mount_dic["source"],
type=mount_dic["type"],
propagation=mount_dic["bind-propagation"])
DOCKER_MOUNTS.append(mount)
try:
mount_str = config.get(__docker_mounts, mount_name)
mount_dict = parse_mount(mount_str)
mount = docker.types.Mount(mount_dict["target"],
mount_dict["source"],
type=mount_dict["type"],
propagation=mount_dict["bind-propagation"])
DOCKER_MOUNTS.append(mount)
except KeyError as e:
logging.exception("Failed to create Mount for key=%s, value=%s",
mount_name, mount_str)

# pylint: disable=broad-except
except Exception as e:
Expand Down
74 changes: 42 additions & 32 deletions rqd/rqd/rqcore.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def __init__(self, optNimbyoff=False):
self.docker_mounts = []
self.docker_image = "Invalid"
if rqd.rqconstants.RUN_ON_DOCKER:
# pylint: disable=import-outside-toplevel
import docker
self.docker_client = docker.from_env()
self.docker_image = rqd.rqconstants.DOCKER_IMAGE
Expand Down Expand Up @@ -940,7 +941,8 @@ def runDocker(self):
#
# image = self.rqCore.docker_images.get(runFrame.os)
# if image is None:
# raise RuntimeError("rqd not configured to run an image for this frame OS: %s", runFrame.os)
# raise RuntimeError("rqd not configured to run an
# image for this frame OS: %s", runFrame.os)
image = self.rqCore.docker_image

self.__createEnvVariables()
Expand All @@ -961,35 +963,41 @@ def runDocker(self):
tempCommand += [runFrame.command]

# Print PID before executing
command = ["sh", "-c", "echo '$$'; exec " + " ".join(tempCommand)]
command = ["sh", "-c", "echo $$; exec " + " ".join(tempCommand)]

client = self.rqCore.docker_client
container = client.containers.run(image=image,
detach=True,
environment=self.frameEnv,
working_dir=self.rqCore.machine.getTempPath(),
mounts=self.rqCore.docker_mounts,
privileged=True,
remove=True,
pid_mode="host",
stderr=True,
hostname=self.frameEnv["jobhost"],
entrypoint=command)

log_stream = container.logs(stream=True)
# CMD prints the process PID before executing the actual command
frameInfo.pid = int(next(log_stream))

if not self.rqCore.updateRssThread.is_alive():
self.rqCore.updateRssThread = threading.Timer(rqd.rqconstants.RSS_UPDATE_INTERVAL,
self.rqCore.updateRss)
self.rqCore.updateRssThread.start()

for line in log_stream:
self.rqlog.write(line, prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP)

output = container.wait()
returncode = output["StatusCode"]
try:
container = client.containers.run(image=image,
detach=True,
environment=self.frameEnv,
working_dir=self.rqCore.machine.getTempPath(),
mounts=self.rqCore.docker_mounts,
privileged=True,
remove=True,
pid_mode="host",
stderr=True,
hostname=self.frameEnv["jobhost"],
entrypoint=command,
user=runFrame.uid)

log_stream = container.logs(stream=True)
# CMD prints the process PID before executing the actual command
frameInfo.pid = int(next(log_stream))

if not self.rqCore.updateRssThread.is_alive():
self.rqCore.updateRssThread = threading.Timer(rqd.rqconstants.RSS_UPDATE_INTERVAL,
self.rqCore.updateRss)
self.rqCore.updateRssThread.start()

for line in log_stream:
self.rqlog.write(line, prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP)

output = container.wait()
returncode = output["StatusCode"]
# pylint: disable=broad-except
except Exception:
returncode = 1
logging.exception("Failed to launch frame container")

# Find exitStatus and exitSignal
if returncode < 0:
Expand Down Expand Up @@ -1128,6 +1136,7 @@ def run(self):
log.info("Monitor frame started for frameId=%s", self.frameId)

runFrame = self.runFrame
run_on_docker = self.rqCore.docker_client is not None

# pylint: disable=too-many-nested-blocks
try:
Expand All @@ -1146,9 +1155,10 @@ def run(self):
rqd.rqutil.checkAndCreateUser(runFrame.user_name,
runFrame.uid,
runFrame.gid)
# Do everything as launching user:
runFrame.gid = rqd.rqconstants.LAUNCH_FRAME_USER_GID
rqd.rqutil.permissionsUser(runFrame.uid, runFrame.gid)
if not run_on_docker:
# Do everything as launching user:
runFrame.gid = rqd.rqconstants.LAUNCH_FRAME_USER_GID
rqd.rqutil.permissionsUser(runFrame.uid, runFrame.gid)

# Setup frame logging
try:
Expand All @@ -1165,7 +1175,7 @@ def run(self):
# Store frame in cache and register servant
self.rqCore.storeFrame(runFrame.frame_id, self.frameInfo)

if platform.system() == "Linux" and self.rqCore.docker_client is not None:
if run_on_docker:
self.runDocker()
elif platform.system() == "Linux":
self.runLinux()
Expand Down
4 changes: 4 additions & 0 deletions rqd/rqd/rqutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ def checkAndCreateUser(username, uid=None, gid=None):
cmd.append(username)
log.info("Frame's username not found on host. Adding user with: %s", cmd)
subprocess.check_call(cmd)
# pylint: disable=broad-except
except Exception:
logging.exception("useradd failed to add user: %s. User possibly already exists.",
username)
finally:
permissionsLow()

Expand Down

0 comments on commit 4e365d4

Please sign in to comment.