generated from canonical/starbase
-
Notifications
You must be signed in to change notification settings - Fork 12
/
lifecycle.py
431 lines (368 loc) · 16.2 KB
/
lifecycle.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
# This file is part of craft-application.
#
# Copyright 2023-2024 Canonical Ltd.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License version 3, as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranties of MERCHANTABILITY,
# SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
"""craft-parts lifecycle integration."""
from __future__ import annotations
import contextlib
import os
import types
from typing import TYPE_CHECKING, Any
from craft_cli import emit
from craft_parts import (
Action,
ActionType,
Features,
LifecycleManager,
PartsError,
ProjectInfo,
Step,
StepInfo,
callbacks,
)
from craft_parts.errors import CallbackRegistrationError
from typing_extensions import override
from craft_application import errors, models, util
from craft_application.services import base
from craft_application.util import convert_architecture_deb_to_platform, repositories
if TYPE_CHECKING: # pragma: no cover
from pathlib import Path
from craft_application.application import AppMetadata
from craft_application.models import Project
from craft_application.services import ServiceFactory
ACTION_MESSAGES = types.MappingProxyType(
{
Step.PULL: types.MappingProxyType(
{
ActionType.RUN: "Pulling",
ActionType.RERUN: "Repulling",
ActionType.SKIP: "Skipping pull for",
ActionType.UPDATE: "Updating sources for",
}
),
Step.OVERLAY: types.MappingProxyType(
{
ActionType.RUN: "Overlaying",
ActionType.RERUN: "Re-overlaying",
ActionType.SKIP: "Skipping overlay for",
ActionType.UPDATE: "Updating overlay for",
ActionType.REAPPLY: "Reapplying",
}
),
Step.BUILD: types.MappingProxyType(
{
ActionType.RUN: "Building",
ActionType.RERUN: "Rebuilding",
ActionType.SKIP: "Skipping build for",
ActionType.UPDATE: "Updating build for",
}
),
Step.STAGE: types.MappingProxyType(
{
ActionType.RUN: "Staging",
ActionType.RERUN: "Restaging",
ActionType.SKIP: "Skipping stage for",
}
),
Step.PRIME: types.MappingProxyType(
{
ActionType.RUN: "Priming",
ActionType.RERUN: "Repriming",
ActionType.SKIP: "Skipping prime for",
}
),
}
)
def _get_parts_action_message(action: Action) -> str:
"""Get a user-readable message for a particular craft-parts action."""
message = f"{ACTION_MESSAGES[action.step][action.action_type]} {action.part_name}"
if action.reason:
return message + f" ({action.reason})"
return message
def _get_step(step_name: str) -> Step:
"""Get a lifecycle step by name."""
if step_name.lower() == "overlay" and not Features().enable_overlay:
raise RuntimeError("Invalid target step 'overlay'")
steps = Step.__members__
try:
return steps[step_name.upper()]
except KeyError:
raise RuntimeError(f"Invalid target step {step_name!r}") from None
class LifecycleService(base.ProjectService):
"""Create and manage the parts lifecycle.
:param app: An AppMetadata object containing metadata about the application.
:param project: The Project object that describes this project.
:param work_dir: The working directory for parts processing.
:param cache_dir: The cache directory for parts processing.
:param build_plan: The filtered build plan of platforms that are valid for
the running host.
:param lifecycle_kwargs: Additional keyword arguments are passed through to the
LifecycleManager on initialisation.
"""
def __init__(
self,
app: AppMetadata,
services: ServiceFactory,
*,
project: Project,
work_dir: Path | str,
cache_dir: Path | str,
build_plan: list[models.BuildInfo],
partitions: list[str] | None = None,
**lifecycle_kwargs: Any, # noqa: ANN401 - eventually used in an Any
) -> None:
super().__init__(app, services, project=project)
self._work_dir = work_dir
self._cache_dir = cache_dir
self._build_plan = build_plan
self._partitions = partitions
self._manager_kwargs = lifecycle_kwargs
self._lcm: LifecycleManager = None # type: ignore[assignment]
@override
def setup(self) -> None:
"""Initialize the LifecycleManager with previously-set arguments."""
self._lcm = self._init_lifecycle_manager()
callbacks.register_post_step(self.post_prime, step_list=[Step.PRIME])
def _init_lifecycle_manager(self) -> LifecycleManager:
"""Create and return the Lifecycle manager.
An application may override this method if needed if the lifecycle
manager needs to be called differently.
"""
emit.debug(f"Initialising lifecycle manager in {self._work_dir}")
emit.trace(f"Lifecycle: {repr(self)}")
# Note: we fallback to the host's architecture here if the build plan
# is empty just to be able to create the LifecycleManager; this will
# correctly fail later on when run() is called (but not necessarily when
# something else like clean() is called).
# We also use the host arch if the build-for is 'all'
if self._build_plan and self._build_plan[0].build_for != "all":
build_for = self._build_plan[0].build_for
else:
build_for = util.get_host_architecture()
if self._project.package_repositories:
self._manager_kwargs["package_repositories"] = (
self._project.package_repositories
)
pvars: dict[str, str] = {}
for var in self._app.project_variables:
pvars[var] = getattr(self._project, var) or ""
self._project_vars = pvars
emit.debug(f"Project vars: {self._project_vars}")
emit.debug(f"Adopting part: {self._project.adopt_info}")
try:
return LifecycleManager(
{"parts": self._project.parts},
application_name=self._app.name,
arch=convert_architecture_deb_to_platform(build_for),
cache_dir=self._cache_dir,
work_dir=self._work_dir,
ignore_local_sources=self._app.source_ignore_patterns,
parallel_build_count=self._get_parallel_build_count(),
project_vars_part_name=self._project.adopt_info,
project_vars=self._project_vars,
track_stage_packages=True,
partitions=self._partitions,
**self._manager_kwargs,
)
except PartsError as err:
raise errors.PartsLifecycleError.from_parts_error(err) from err
@property
def prime_dir(self) -> Path:
"""The path to the prime directory."""
return self._lcm.project_info.dirs.prime_dir
@property
def project_info(self) -> ProjectInfo:
"""The lifecycle's ProjectInfo."""
return self._lcm.project_info
def get_pull_assets(self, *, part_name: str) -> dict[str, Any] | None:
"""Obtain the part's pull state assets.
:param part_name: The name of the part to get assets from.
:return: The dictionary of the part's pull assets, or None if no state found.
"""
return self._lcm.get_pull_assets(part_name=part_name)
def get_primed_stage_packages(self, *, part_name: str) -> list[str] | None:
"""Obtain the list of primed stage packages.
:param part_name: The name of the part to get primed stage packages from.
:return: The sorted list of primed stage packages, or None if no state found.
"""
return self._lcm.get_primed_stage_packages(part_name=part_name)
def run(self, step_name: str | None, part_names: list[str] | None = None) -> None:
"""Run the lifecycle manager for the parts."""
target_step = _get_step(step_name) if step_name else None
# Now that we are actually going to run we can validate what we're building.
_validate_build_plan(self._build_plan)
try:
if self._project.package_repositories:
emit.trace("Installing package repositories")
repositories.install_package_repositories(
self._project.package_repositories,
self._lcm,
local_keys_path=self._get_local_keys_path(),
)
with contextlib.suppress(CallbackRegistrationError):
callbacks.register_configure_overlay(
repositories.install_overlay_repositories
)
if target_step:
emit.trace(f"Planning {step_name} for {part_names or 'all parts'}")
actions = self._lcm.plan(target_step, part_names=part_names)
else:
actions = []
emit.progress("Initialising lifecycle")
with self._lcm.action_executor() as aex:
for action in actions:
message = _get_parts_action_message(action)
emit.progress(message)
with emit.open_stream() as stream:
aex.execute(action, stdout=stream, stderr=stream)
except PartsError as err:
raise errors.PartsLifecycleError.from_parts_error(err) from err
except RuntimeError as err:
raise RuntimeError(f"Parts processing internal error: {err}") from err
except OSError as err:
raise errors.PartsLifecycleError.from_os_error(err) from err
except Exception as err:
raise errors.PartsLifecycleError(f"Unknown error: {str(err)}") from err
def post_prime(self, step_info: StepInfo) -> bool:
"""Perform any necessary post-lifecycle modifications to the prime directory.
This method should be idempotent and meet the requirements for a craft-parts
callback. It is added as a post-prime callback during the setup phase.
NOTE: This is not guaranteed to run in any particular order if other callbacks
are added to the prime step.
"""
if step_info.step != Step.PRIME:
raise RuntimeError(f"Post-prime hook called after step: {step_info.step}")
return False
def clean(self, part_names: list[str] | None = None) -> None:
"""Remove lifecycle artifacts.
:param part_names: The names of the parts to clean. If unspecified, all parts
will be cleaned.
"""
if part_names:
message = "Cleaning parts: " + ", ".join(part_names)
else:
message = "Cleaning all parts"
emit.progress(message)
self._lcm.clean(part_names=part_names)
@staticmethod
def previous_step_name(step_name: str) -> str | None:
"""Get the name of the step immediately previous to `step_name`.
Returns None if `step_name` is the first one (pull).
"""
step = _get_step(step_name)
previous_steps = step.previous_steps()
return previous_steps[-1].name.lower() if previous_steps else None
def __repr__(self) -> str:
work_dir = self._work_dir
cache_dir = self._cache_dir
plan = self._build_plan
return (
f"{self.__class__.__name__}({self._app!r}, {self._project!r}, "
f"{work_dir=}, {cache_dir=}, {plan=}, **{self._manager_kwargs!r})"
)
def _verify_parallel_build_count(
self, env_name: str, parallel_build_count: int | str
) -> int:
"""Verify the parallel build count is valid.
:param env_name: The name of the environment variable being checked.
:param parallel_build_count: The value of the variable.
:return: The parallel build count as an integer.
"""
try:
parallel_build_count = int(parallel_build_count)
except ValueError as err:
raise errors.InvalidParameterError(
env_name, str(os.environ[env_name])
) from err
# Ensure the value is valid positive integer
if parallel_build_count < 1:
raise errors.InvalidParameterError(env_name, str(parallel_build_count))
return parallel_build_count
def _get_parallel_build_count(self) -> int:
"""Get the number of parallel builds to run.
The parallel build count is determined by the first available of the
following environment variables in the order:
- <APP_NAME>_PARALLEL_BUILD_COUNT
- CRAFT_PARALLEL_BUILD_COUNT
- <APP_NAME>_MAX_PARALLEL_BUILD_COUNT
- CRAFT_MAX_PARALLEL_BUILD_COUNT
where the MAX_PARALLEL_BUILD_COUNT variables are dynamically compared to
the number of CPUs, and the smaller of the two is used.
If no environment variable is set, the CPU count is used.
If the CPU count is not available for some reason, 1 is used as a fallback.
"""
parallel_build_count = None
# fixed parallel build count environment variable
for env_name in [
(self._app.name + "_PARALLEL_BUILD_COUNT").upper(),
"CRAFT_PARALLEL_BUILD_COUNT",
]:
if os.environ.get(env_name):
parallel_build_count = self._verify_parallel_build_count(
env_name, os.environ[env_name]
)
emit.debug(
f"Using parallel build count of {parallel_build_count} "
f"from environment variable {env_name!r}"
)
break
# CPU count related max parallel build count environment variable
if parallel_build_count is None:
cpu_count = os.cpu_count() or 1
for env_name in [
(self._app.name + "_MAX_PARALLEL_BUILD_COUNT").upper(),
"CRAFT_MAX_PARALLEL_BUILD_COUNT",
]:
if os.environ.get(env_name):
parallel_build_count = min(
cpu_count,
self._verify_parallel_build_count(
env_name, os.environ[env_name]
),
)
emit.debug(
f"Using parallel build count of {parallel_build_count} "
f"from environment variable {env_name!r}"
)
break
# Default to CPU count if no max environment variable is set
if parallel_build_count is None:
parallel_build_count = cpu_count
emit.debug(
f"Using parallel build count of {parallel_build_count} "
"from CPU count"
)
return parallel_build_count
def _get_local_keys_path(self) -> Path | None:
"""Return a directory with public keys for package-repositories.
This default implementation does not support local keys; it should be
overridden by subclasses that do.
"""
return None
def _validate_build_plan(build_plan: list[models.BuildInfo]) -> None:
"""Check that the build plan has exactly 1 compatible build info."""
if not build_plan:
raise errors.EmptyBuildPlanError
if len(build_plan) > 1:
raise errors.MultipleBuildsError
build_base = build_plan[0].base
host_base = util.get_host_base()
if build_base.version == "devel":
# If the build base is "devel", we don't try to match the specific
# version as that is a moving target; Just ensure the systems are the
# same.
if build_base.name != host_base.name:
raise errors.IncompatibleBaseError(host_base, build_base)
elif build_base != host_base:
raise errors.IncompatibleBaseError(host_base, build_base)