Skip to content

Commit

Permalink
Add new initial command stage to mutex all reads of the installspace. (
Browse files Browse the repository at this point in the history
…#391)

* Add new initial command stage to mutex all reads of the installspace.

* Changed load_env -> loadenv.

This better aligns the function name with other similar
helpers in catkin_tools.jobs.utils.

* Drop job env from clean jobs.

* Drop unused 'import os'
  • Loading branch information
mikepurvis authored and wjwwood committed Jan 28, 2017
1 parent 426af25 commit 5262dec
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 44 deletions.
6 changes: 1 addition & 5 deletions catkin_tools/execution/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

from __future__ import print_function

import os
import traceback

from itertools import tee
Expand Down Expand Up @@ -59,9 +58,6 @@ def async_job(verb, job, threadpool, locks, event_queue, log_path):
# Jobs start occuping a jobserver job
occupying_job = True

# Load environment for this job
job_env = job.getenv(os.environ)

# Execute each stage of this job
for stage in job.stages:
# Logger reference in this scope for error reporting
Expand Down Expand Up @@ -102,7 +98,7 @@ def async_job(verb, job, threadpool, locks, event_queue, log_path):
while True:
try:
# Update the environment for this stage (respects overrides)
stage.update_env(job_env)
stage.update_env(job.env)

# Get the logger
protocol_type = stage.logger_factory(verb, job.jid, stage.label, event_queue, log_path)
Expand Down
7 changes: 2 additions & 5 deletions catkin_tools/execution/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class Job(object):

"""A Job is a series of operations, each of which is considered a "stage" of the job."""

def __init__(self, jid, deps, env_loader, stages, continue_on_failure=True):
def __init__(self, jid, deps, env, stages, continue_on_failure=True):
"""
jid: Unique job identifier
deps: Dependencies (in terms of other jid's)
Expand All @@ -33,7 +33,7 @@ def __init__(self, jid, deps, env_loader, stages, continue_on_failure=True):
"""
self.jid = jid
self.deps = deps
self.env_loader = env_loader
self.env = env
self.stages = stages
self.continue_on_failure = continue_on_failure

Expand All @@ -48,6 +48,3 @@ def all_deps_succeeded(self, completed_jobs):
def any_deps_failed(self, completed_jobs):
"""Return True if any dependencies which have been completed have failed."""
return any([not completed_jobs.get(dep_id, True) for dep_id in self.deps])

def getenv(self, env):
return self.env_loader(env)
21 changes: 18 additions & 3 deletions catkin_tools/jobs/catkin.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from .commands.make import MAKE_EXEC

from .utils import copyfiles
from .utils import get_env_loader
from .utils import loadenv
from .utils import makedirs
from .utils import rmfiles

Expand Down Expand Up @@ -350,10 +350,23 @@ def create_catkin_build_job(context, package, package_path, dependencies, force_
install_space = context.package_install_space(package)
# Package metadata path
metadata_path = context.package_metadata_path(package)
# Environment dictionary for the job, which will be built
# up by the executions in the loadenv stage.
job_env = dict(os.environ)

# Create job stages
stages = []

# Load environment for job.
stages.append(FunctionStage(
'loadenv',
loadenv,
locked_resource='installspace',
job_env=job_env,
package=package,
context=context
))

# Create package build space
stages.append(FunctionStage(
'mkdir',
Expand Down Expand Up @@ -478,7 +491,7 @@ def create_catkin_build_job(context, package, package_path, dependencies, force_
return Job(
jid=package.name,
deps=dependencies,
env_loader=get_env_loader(package, context),
env=job_env,
stages=stages)


Expand All @@ -499,6 +512,8 @@ def create_catkin_clean_job(
build_space = context.package_build_space(package)
# Package metadata path
metadata_path = context.package_metadata_path(package)
# Environment dictionary for the job, empty for a clean job
job_env = {}

# Remove installed files
if clean_install:
Expand Down Expand Up @@ -566,7 +581,7 @@ def create_catkin_clean_job(
return Job(
jid=package.name,
deps=dependencies,
env_loader=get_env_loader(package, context),
env=job_env,
stages=stages)


Expand Down
21 changes: 18 additions & 3 deletions catkin_tools/jobs/cmake.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from .commands.make import MAKE_EXEC

from .utils import copyfiles
from .utils import get_env_loader
from .utils import loadenv
from .utils import makedirs
from .utils import rmfiles

Expand Down Expand Up @@ -217,6 +217,9 @@ def create_cmake_build_job(context, package, package_path, dependencies, force_c
build_space = context.package_build_space(package)
# Package metadata path
metadata_path = context.package_metadata_path(package)
# Environment dictionary for the job, which will be built
# up by the executions in the loadenv stage.
job_env = dict(os.environ)

# Get actual staging path
dest_path = context.package_dest_path(package)
Expand All @@ -225,6 +228,16 @@ def create_cmake_build_job(context, package, package_path, dependencies, force_c
# Create job stages
stages = []

# Load environment for job.
stages.append(FunctionStage(
'loadenv',
loadenv,
locked_resource='installspace',
job_env=job_env,
package=package,
context=context
))

# Create package build space
stages.append(FunctionStage(
'mkdir',
Expand Down Expand Up @@ -321,7 +334,7 @@ def create_cmake_build_job(context, package, package_path, dependencies, force_c
return Job(
jid=package.name,
deps=dependencies,
env_loader=get_env_loader(package, context),
env=job_env,
stages=stages)


Expand All @@ -340,6 +353,8 @@ def create_cmake_clean_job(
build_space = context.package_build_space(package)
# Package metadata path
metadata_path = context.package_metadata_path(package)
# Environment dictionary for the job, empty for a clean job
job_env = {}

stages = []

Expand Down Expand Up @@ -381,7 +396,7 @@ def create_cmake_clean_job(
return Job(
jid=package.name,
deps=dependencies,
env_loader=get_env_loader(package, context),
env=job_env,
stages=stages)


Expand Down
45 changes: 19 additions & 26 deletions catkin_tools/jobs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,32 +50,25 @@ def get_env_loaders(package, context):
return sources


def get_env_loader(package, context):
"""This function returns a function object which extends a base environment
based on a set of environments to load."""

def load_env(base_env):
# Copy the base environment to extend
job_env = dict(base_env)
# Get the paths to the env loaders
env_loader_paths = get_env_loaders(package, context)
# If DESTDIR is set, set _CATKIN_SETUP_DIR as well
if context.destdir is not None:
job_env['_CATKIN_SETUP_DIR'] = context.package_dest_path(package)

for env_loader_path in env_loader_paths:
# print(' - Loading resultspace env from: {}'.format(env_loader_path))
resultspace_env = get_resultspace_environment(
os.path.split(env_loader_path)[0],
base_env=job_env,
quiet=True,
cached=context.use_env_cache,
strict=False)
job_env.update(resultspace_env)

return job_env

return load_env
def loadenv(logger, event_queue, job_env, package, context):
# Get the paths to the env loaders
env_loader_paths = get_env_loaders(package, context)
# If DESTDIR is set, set _CATKIN_SETUP_DIR as well
if context.destdir is not None:
job_env['_CATKIN_SETUP_DIR'] = context.package_dest_path(package)

for env_loader_path in env_loader_paths:
if logger:
logger.out('Loading environment from: {}'.format(env_loader_path))
resultspace_env = get_resultspace_environment(
os.path.split(env_loader_path)[0],
base_env=job_env,
quiet=True,
cached=context.use_env_cache,
strict=False)
job_env.update(resultspace_env)

return 0


def makedirs(logger, event_queue, path):
Expand Down
5 changes: 3 additions & 2 deletions catkin_tools/verbs/catkin_build/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@

import catkin_tools.execution.job_server as job_server

from catkin_tools.jobs.utils import get_env_loader
from catkin_tools.jobs.utils import loadenv

from catkin_tools.metadata import find_enclosing_workspace
from catkin_tools.metadata import get_metadata
Expand Down Expand Up @@ -229,7 +229,8 @@ def print_build_env(context, package_name):
# Load the environment used by this package for building
for pth, pkg in workspace_packages.items():
if pkg.name == package_name:
environ = get_env_loader(pkg, context)(os.environ)
environ = dict(os.environ)
loadenv(None, None, environ, pkg, context)
print(format_env_dict(environ))
return 0
print('[build] Error: Package `{}` not in workspace.'.format(package_name),
Expand Down

0 comments on commit 5262dec

Please sign in to comment.