diff --git a/src/cli/main/__main__.py b/src/cli/main/__main__.py index a595345c..fa0ddba9 100644 --- a/src/cli/main/__main__.py +++ b/src/cli/main/__main__.py @@ -1,3 +1,4 @@ +# pylint: disable=too-many-lines from contextlib import ( suppress, ) @@ -7,9 +8,11 @@ import io import json import operator +import os from os import ( environ, getcwd, + getlogin, makedirs, remove, ) @@ -33,6 +36,9 @@ import rich.text import shlex import shutil +from socket import ( + gethostname, +) import subprocess # nosec import sys import tempfile @@ -250,6 +256,15 @@ def _clone_src_git_init(head: str) -> None: raise SystemExit(out) +def _clone_src_git_rev_parse(head: str, rev: str) -> str: + cmd = ["git", "-C", head, "rev-parse", rev] + out, stdout, _ = _run_outputs(cmd, stderr=None) + if out != 0: + raise SystemExit(out) + + return next(iter(stdout.decode().splitlines()), "HEAD") + + def _clone_src_git_fetch(head: str, remote: str, rev: str) -> None: depth = _if(GIT_DEPTH >= 1, f"--depth={GIT_DEPTH}") cmd = ["git", "-C", head, "fetch", *depth, remote, f"{rev}:{rev}"] @@ -379,6 +394,45 @@ def _nix_build( ] +def _nix_hashes(*paths: str) -> List[str]: + cmd = [ + f"{__NIX_STABLE__}/bin/nix-store", + "--query", + "--hash", + *paths, + ] + out, stdout, _ = _run_outputs(cmd, stderr=None) + if out != 0: + raise SystemExit(out) + + return stdout.decode().splitlines() + + +def _nix_build_requisites(path: str) -> List[Tuple[str, str]]: + """Answer the question: what do I need to build `out`.""" + cmd = [f"{__NIX_STABLE__}/bin/nix-store", "--query", "--deriver", path] + out, stdout, _ = _run_outputs(cmd, stderr=None) + if out != 0: + raise SystemExit(out) + + cmd = [ + f"{__NIX_STABLE__}/bin/nix-store", + "--query", + "--requisites", + "--include-outputs", + *stdout.decode().splitlines(), + ] + out, stdout, _ = _run_outputs(cmd, stderr=None) + if out != 0: + raise SystemExit(out) + + requisites: List[str] = stdout.decode().splitlines() + + hashes: List[str] = _nix_hashes(*requisites) + + return list(zip(requisites, hashes)) + + def _get_head(src: str) -> str: # Checkout repository HEAD into a temporary directory # This is nice for reproducibility and security, @@ -775,9 +829,11 @@ def cli(args: List[str]) -> None: args, attr = _cli_get_args_and_attr(args, config.attrs, src) out: str = join(MAKES_DIR, f"out{attr.replace('/', '-')}") + provenance: str = join(MAKES_DIR, f"provenance{attr.replace('/', '-')}") code = _cli_build(attr, config, head, out, src) if code == 0: + write_provenance(args, head, out, provenance, src) cache_push(config.cache, out) execute_action(args[3:], head, out) @@ -862,6 +918,76 @@ def cache_push(cache: List[Dict[str, str]], out: str) -> None: return +def _get_sys_id() -> str: + with suppress(AttributeError): + uname = os.uname() + return f"{uname.nodename}-{uname.sysname}-{uname.machine}" + + with suppress(OSError): + return gethostname() + + return "unknown" + + +def _get_usr() -> str: + with suppress(OSError): + return getlogin() + + return "unknown" + + +def write_provenance( + args: List[str], + head: str, + out: str, + provenance: str, + src: str, +) -> None: + attestation: Dict[str, Any] = {} + attestation["_type"] = "https://in-toto.io/Statement/v0.1" + attestation["predicateType"] = "https://slsa.dev/provenance/v0.2" + + attestation["predicate"] = {} + attestation["predicate"]["builder"] = {} + attestation["predicate"]["builder"]["id"] = f"{_get_usr()}@{_get_sys_id()}" + attestation["predicate"]["buildType"] = ( + f"https://fluidattacks.com/Attestations/Makes@{VERSION}", + ) + attestation["predicate"]["invocation"] = {} + attestation["predicate"]["invocation"]["configSource"] = { + "uri": f"git+https://{src}", + "digest": {"sha1": _clone_src_git_rev_parse(head, "HEAD")}, + "entrypoint": args[0], + } + attestation["predicate"]["invocation"]["parameters"] = args[1:] + attestation["predicate"]["invocation"]["environment"] = { + key: "" for key in environ + } + attestation["predicate"]["metadata"] = {} + attestation["predicate"]["metadata"]["completeness"] = {} + attestation["predicate"]["metadata"]["completeness"]["environment"] = True + attestation["predicate"]["metadata"]["completeness"]["materials"] = True + attestation["predicate"]["metadata"]["completeness"]["parameters"] = True + attestation["predicate"]["metadata"]["reproducible"] = True + attestation["predicate"]["materials"] = [ + { + "uri": requisite, + "hash": dict([hash_.split(":")]), # type: ignore + } + for requisite, hash_ in _nix_build_requisites(out) + ] + + attestation["subject"] = [ + { + "uri": out, + "hash": dict([_nix_hashes(out)[0].split(":")]), # type: ignore + } + ] + + with open(provenance, encoding="utf-8", mode="w+") as attestation_file: + json.dump(attestation, attestation_file, indent=2, sort_keys=True) + + def main() -> None: try: cli(sys.argv)