Skip to content
This repository has been archived by the owner on Oct 13, 2023. It is now read-only.

Commit

Permalink
Make file I/O operations async
Browse files Browse the repository at this point in the history
  • Loading branch information
locriandev committed Nov 28, 2022
1 parent efd1cf8 commit d0d3908
Showing 1 changed file with 37 additions and 30 deletions.
67 changes: 37 additions & 30 deletions doozerlib/cli/release_gen_payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import List, Optional, Tuple, Dict, NamedTuple, Iterable, Set, Any, Callable
from unittest.mock import MagicMock

import aiofiles
import click
import yaml
from doozerlib import rhcos
Expand Down Expand Up @@ -144,7 +145,7 @@ def payload_imagestream_namespace_and_name(base_namespace: str, base_imagestream
return namespace, name


def modify_and_replace_api_object(api_obj: oc.APIObject, modifier_func: Callable[[oc.APIObject], Any], backup_file_path: Path, dry_run: bool):
async def modify_and_replace_api_object(api_obj: oc.APIObject, modifier_func: Callable[[oc.APIObject], Any], backup_file_path: Path, dry_run: bool):
"""
Receives an APIObject, archives the current state of that object, runs a modifying method on it,
archives the new state of the object, and then tries to replace the object on the
Expand All @@ -156,8 +157,11 @@ def modify_and_replace_api_object(api_obj: oc.APIObject, modifier_func: Callable
states of the object before triggering the update.
:param dry_run: Write archive files but do not actually update the imagestream.
"""
with backup_file_path.joinpath(f"replacing-{api_obj.kind()}.{api_obj.namespace()}.{api_obj.name()}.before-modify.json").open(mode="w+") as backup_file:
backup_file.write(api_obj.as_json(indent=4))

filepath = backup_file_path.joinpath(
f"replacing-{api_obj.kind()}.{api_obj.namespace()}.{api_obj.name()}.before-modify.json")
async with aiofiles.open(filepath, mode='w+') as backup_file:
await backup_file.write(api_obj.as_json(indent=4))

modifier_func(api_obj)
api_obj_model = api_obj.model
Expand All @@ -174,8 +178,10 @@ def modify_and_replace_api_object(api_obj: oc.APIObject, modifier_func: Callable

api_obj_model.pop("status")

with backup_file_path.joinpath(f"replacing-{api_obj.kind()}.{api_obj.namespace()}.{api_obj.name()}.after-modify.json").open(mode="w+") as backup_file:
backup_file.write(api_obj.as_json(indent=4))
filepath = backup_file_path.joinpath(
f"replacing-{api_obj.kind()}.{api_obj.namespace()}.{api_obj.name()}.after-modify.json")
async with aiofiles.open(filepath, mode="w+") as backup_file:
await backup_file.write(api_obj.as_json(indent=4))

if not dry_run:
api_obj.replace()
Expand Down Expand Up @@ -595,7 +601,8 @@ async def sync_payloads(self):
tasks.append(self.mirror_payload_content(arch, payload_entries))
for private_mode in self.privacy_modes:
self.logger.info(f"Building payload files for architecture: {arch}; private: {private_mode}")
self.generate_specific_payload_imagestreams(arch, private_mode, payload_entries, multi_specs)
tasks.append(
self.generate_specific_payload_imagestreams(arch, private_mode, payload_entries, multi_specs))
await asyncio.gather(*tasks)

if self.apply_multi_arch:
Expand Down Expand Up @@ -626,9 +633,9 @@ async def mirror_payload_content(self, arch: str, payload_entries: Dict[str, Pay
# it is what we update in the imagestreams that defines whether the image will be
# part of a public vs private release.
src_dest_path = self.output_path.joinpath(f"src_dest.{arch}")
with src_dest_path.open("w+", encoding="utf-8") as out_file:
async with aiofiles.open(src_dest_path, mode="w+", encoding="utf-8") as out_file:
for dest_pullspec, src_pullspec in mirror_src_for_dest.items():
out_file.write(f"{src_pullspec}={dest_pullspec}\n")
await out_file.write(f"{src_pullspec}={dest_pullspec}\n")

if self.apply or self.apply_multi_arch:
self.logger.info(f"Mirroring images from {str(src_dest_path)}")
Expand All @@ -638,7 +645,7 @@ async def mirror_payload_content(self, arch: str, payload_entries: Dict[str, Pay
except asyncio.TimeoutError:
pass

def generate_specific_payload_imagestreams(
async def generate_specific_payload_imagestreams(
self, arch: str, private_mode: bool,
payload_entries: Dict[str, PayloadEntry],
# Map [is_private] -> [tag_name] -> [arch] -> PayloadEntry
Expand Down Expand Up @@ -673,26 +680,27 @@ def generate_specific_payload_imagestreams(
imagestream_namespace, imagestream_name = payload_imagestream_namespace_and_name(
*self.base_imagestream, arch, private_mode)

self.write_imagestream_artifact_file(imagestream_namespace, imagestream_name, istags, incomplete_payload_update)
await self.write_imagestream_artifact_file(imagestream_namespace, imagestream_name, istags, incomplete_payload_update)
if self.apply:
self.apply_arch_imagestream(imagestream_namespace, imagestream_name, istags, incomplete_payload_update)
await self.apply_arch_imagestream(imagestream_namespace, imagestream_name, istags, incomplete_payload_update)

def write_imagestream_artifact_file(self, imagestream_namespace: str, imagestream_name: str, istags: List[Dict], incomplete_payload_update):
async def write_imagestream_artifact_file(self, imagestream_namespace: str, imagestream_name: str, istags: List[Dict], incomplete_payload_update):
"""Write the yaml file for the imagestream."""
filename = f"updated-tags-for.{imagestream_namespace}.{imagestream_name}{'-partial' if incomplete_payload_update else ''}.yaml"
with self.output_path.joinpath(filename).open("w+", encoding="utf-8") as out_file:
filename = f"updated-tags-for.{imagestream_namespace}.{imagestream_name}" \
f"{'-partial' if incomplete_payload_update else ''}.yaml"
async with aiofiles.open(self.output_path.joinpath(filename), mode="w+", encoding="utf-8") as out_file:
istream_spec = PayloadGenerator.build_payload_imagestream(
self.runtime,
imagestream_name, imagestream_namespace,
istags, self.assembly_issues
)
yaml.safe_dump(istream_spec, out_file, indent=2, default_flow_style=False)
await out_file.write(yaml.safe_dump(istream_spec, indent=2, default_flow_style=False))

def apply_arch_imagestream(self, imagestream_namespace: str, imagestream_name: str, istags: List[Dict], incomplete_payload_update: bool):
async def apply_arch_imagestream(self, imagestream_namespace: str, imagestream_name: str, istags: List[Dict], incomplete_payload_update: bool):
"""Orchestrate the update and tag removal for one arch imagestream in the OCP cluster."""
with oc.project(imagestream_namespace):
istream_apiobj = self.ensure_imagestream_apiobj(imagestream_name)
pruning_tags, adding_tags = self.apply_imagestream_update(istream_apiobj, istags, incomplete_payload_update)
pruning_tags, adding_tags = await self.apply_imagestream_update(istream_apiobj, istags, incomplete_payload_update)

if pruning_tags:
self.logger.warning(f"The following tag names are no longer part of the release and will be pruned in {imagestream_namespace}:{imagestream_name}: {pruning_tags}")
Expand Down Expand Up @@ -727,7 +735,7 @@ def ensure_imagestream_apiobj(self, imagestream_name):
})
return oc.selector(f"imagestream/{imagestream_name}").object()

def apply_imagestream_update(self, istream_apiobj, istags: List[Dict], incomplete_payload_update: bool) -> Tuple[Set[str], Set[str]]:
async def apply_imagestream_update(self, istream_apiobj, istags: List[Dict], incomplete_payload_update: bool) -> Tuple[Set[str], Set[str]]:
"""Apply changes for one imagestream object to the OCP cluster."""
# gather diffs between old and new, indicating removal or addition
pruning_tags: Set[str] = set()
Expand Down Expand Up @@ -766,7 +774,7 @@ def update_single_arch_istags(apiobj: oc.APIObject):

apiobj.model.spec.tags = new_istags

modify_and_replace_api_object(istream_apiobj, update_single_arch_istags, self.output_path, self.moist_run)
await modify_and_replace_api_object(istream_apiobj, update_single_arch_istags, self.output_path, self.moist_run)
return pruning_tags, adding_tags

async def sync_heterogeneous_payloads(self, multi_specs: Dict[bool, Dict[str, Dict[str, PayloadEntry]]]):
Expand Down Expand Up @@ -838,7 +846,7 @@ async def sync_heterogeneous_payloads(self, multi_specs: Dict[bool, Dict[str, Di
self.logger.info(f"The final pull_spec for the multi release payload is: {final_multi_pullspec}")

with oc.project(imagestream_namespace):
self.apply_multi_imagestream_update(final_multi_pullspec, imagestream_name, multi_release_istag)
await self.apply_multi_imagestream_update(final_multi_pullspec, imagestream_name, multi_release_istag)

def get_multi_release_names(self, private_mode: bool) -> Tuple[str, str]:
"""
Expand Down Expand Up @@ -931,10 +939,8 @@ async def create_multi_manifest_list(
output_pullspec: str = f"{self.full_component_repo()}:sha256-{manifest_list_hash.hexdigest()}"

# write the manifest list to a file and push it to the registry.
with component_manifest_path.open(mode="w+") as ml:
yaml.safe_dump(
dict(image=output_pullspec, manifests=manifests),
stream=ml, default_flow_style=False)
async with aiofiles.open(component_manifest_path, mode="w+") as ml:
await ml.write(yaml.safe_dump(dict(image=output_pullspec, manifests=manifests), default_flow_style=False))
await exectools.cmd_assert_async(f"manifest-tool push from-spec {str(component_manifest_path)}", retries=3)

# we are pushing a new manifest list, so return its sha256 based pullspec
Expand All @@ -954,8 +960,8 @@ async def create_multi_release_image(
# Write the imagestream to a file ("oc adm release new" can read from a file instead of
# openshift cluster API)
multi_release_is_path: Path = self.output_path.joinpath(f"{imagestream_name}-release-imagestream.yaml")
with multi_release_is_path.open(mode="w+") as mf:
yaml.safe_dump(multi_release_is, mf)
async with aiofiles.open(multi_release_is_path, mode="w+") as mf:
await mf.write(yaml.safe_dump(multi_release_is))

arch_release_dests: Dict[str, str] = dict() # This will map arch names to a release payload pullspec we create for that arch (i.e. based on the arch's CVO image)
tasks = []
Expand Down Expand Up @@ -1000,16 +1006,16 @@ async def create_multi_release_manifest_list(
}

release_payload_ml_path = self.output_path.joinpath(f"{imagestream_name}.manifest-list.yaml")
with release_payload_ml_path.open(mode="w+") as ml:
yaml.safe_dump(ml_dict, stream=ml, default_flow_style=False)
async with aiofiles.open(release_payload_ml_path, mode="w+") as ml:
await ml.write(yaml.safe_dump(ml_dict, default_flow_style=False))

# Construct the top level manifest list release payload
await exectools.cmd_assert_async(f"manifest-tool push from-spec {str(release_payload_ml_path)}", retries=3)
# if we are actually pushing a manifest list, then we should derive a sha256 based pullspec
sha = await find_manifest_list_sha(multi_release_dest)
return exchange_pullspec_tag_for_shasum(multi_release_dest, sha)

def apply_multi_imagestream_update(self, final_multi_pullspec: str, imagestream_name: str, multi_release_istag: str):
async def apply_multi_imagestream_update(self, final_multi_pullspec: str, imagestream_name: str, multi_release_istag: str):
"""
If running with assembly==stream, updates release imagestream with a new imagestream tag for the nightly. Older
nightlies are pruned from the release imagestream.
Expand Down Expand Up @@ -1067,7 +1073,8 @@ def add_multi_nightly_release(obj: oc.APIObject):
})
return True

modify_and_replace_api_object(multi_art_latest_is, add_multi_nightly_release, self.output_path, self.moist_run)
await modify_and_replace_api_object(
multi_art_latest_is, add_multi_nightly_release, self.output_path, self.moist_run)


class PayloadGenerator:
Expand Down

0 comments on commit d0d3908

Please sign in to comment.