Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unify the image spec hash function #2593

Merged
merged 30 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions flytekit/core/python_auto_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from flytekit.core.tracker import TrackedInstance, extract_task_module
from flytekit.core.utils import _get_container_definition, _serialize_pod_spec, timeit
from flytekit.extras.accelerators import BaseAccelerator
from flytekit.image_spec.image_spec import ImageBuildEngine, ImageSpec, _calculate_deduped_hash_from_image_spec
from flytekit.image_spec.image_spec import ImageBuildEngine, ImageSpec
from flytekit.loggers import logger
from flytekit.models import task as _task_model
from flytekit.models.security import Secret, SecurityContext
Expand Down Expand Up @@ -285,7 +285,7 @@ def get_registerable_container_image(img: Optional[Union[str, ImageSpec]], cfg:
:return:
"""
if isinstance(img, ImageSpec):
image = cfg.find_image(_calculate_deduped_hash_from_image_spec(img))
image = cfg.find_image(img.id)
image_name = image.full if image else None
if not image_name:
ImageBuildEngine.build(img)
Expand Down
2 changes: 1 addition & 1 deletion flytekit/image_spec/default_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def create_docker_context(image_spec: ImageSpec, tmp_dir: Path):

uv_python_install_command = UV_PYTHON_INSTALL_COMMAND_TEMPLATE.substitute(PIP_EXTRA=pip_extra_args)

env_dict = {"PYTHONPATH": "/root", _F_IMG_ID: image_spec.image_name()}
env_dict = {"PYTHONPATH": "/root", _F_IMG_ID: image_spec.id}

if image_spec.env:
env_dict.update(image_spec.env)
Expand Down
235 changes: 107 additions & 128 deletions flytekit/image_spec/image_spec.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import base64
import copy
import dataclasses
import hashlib
import os
import pathlib
import re
import typing
from abc import abstractmethod
from dataclasses import asdict, dataclass
from functools import lru_cache
from functools import cached_property, lru_cache
from importlib import metadata
from typing import Dict, List, Optional, Tuple, Union

Expand Down Expand Up @@ -91,37 +92,90 @@ def __post_init__(self):
]
for parameter in parameters_str_list:
attr = getattr(self, parameter)
parameter_is_None = attr is None
parameter_is_none = attr is None
parameter_is_list_string = isinstance(attr, list) and all(isinstance(v, str) for v in attr)
if not (parameter_is_None or parameter_is_list_string):
if not (parameter_is_none or parameter_is_list_string):
error_msg = f"{parameter} must be a list of strings or None"
raise ValueError(error_msg)

@cached_property
def id(self) -> str:
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
"""
Calculate a unique hash as the ID for the ImageSpec, and it will be used to
1. Identify the imageSpec in the ImageConfig in the serialization context.
2. Check if the current container image in the pod is built from this image spec in `is_container()`.

ImageConfig:
- deduced abc: flyteorg/flytekit:123
- deduced xyz: flyteorg/flytekit:456

:return: a unique identifier of the ImageSpec
"""
# Only get the non-None values in the ImageSpec to ensure the hash is consistent across different Flytekit versions.
image_spec_dict = asdict(self, dict_factory=lambda x: {k: v for (k, v) in x if v is not None})
image_spec_bytes = image_spec_dict.__str__().encode("utf-8")
return base64.urlsafe_b64encode(hashlib.md5(image_spec_bytes).digest()).decode("ascii").rstrip("=")

def __hash__(self):
return hash(self.id)

@property
def tag(self) -> str:
"""
Calculate a hash from the image spec. The hash will be the tag of the image.
We will also read the content of the requirement file and the source root to calculate the hash.
Therefore, it will generate different hash if new dependencies are added or the source code is changed.
"""

# copy the image spec to avoid modifying the original image spec. otherwise, the hash will be different.
spec = copy.deepcopy(self)
if isinstance(spec.base_image, ImageSpec):
spec.base_image = spec.base_image.image_name()

if self.source_root:
from flytekit.tools.fast_registration import compute_digest
from flytekit.tools.ignore import DockerIgnore, GitIgnore, IgnoreGroup, StandardIgnore

ignore = IgnoreGroup(self.source_root, [GitIgnore, DockerIgnore, StandardIgnore])
digest = compute_digest(self.source_root, ignore.is_ignored)
spec.source_root = digest

if spec.requirements:
spec.requirements = hashlib.sha1(pathlib.Path(spec.requirements).read_bytes().strip()).hexdigest()
# won't rebuild the image if we change the registry_config path
spec.registry_config = None
image_spec_dict = asdict(spec, dict_factory=lambda x: {k: v for (k, v) in x if v is not None})
image_spec_bytes = image_spec_dict.__str__().encode("utf-8")
tag = (
base64.urlsafe_b64encode(hashlib.md5(image_spec_bytes).digest())
.decode("ascii")
.rstrip("=")
.replace("-", "_")
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this works:

Suggested change
image_spec_dict = asdict(spec, dict_factory=lambda x: {k: v for (k, v) in x if v is not None})
image_spec_bytes = image_spec_dict.__str__().encode("utf-8")
tag = (
base64.urlsafe_b64encode(hashlib.md5(image_spec_bytes).digest())
.decode("ascii")
.rstrip("=")
.replace("-", "_")
)
tag = spec.id.replace("-", "_")

At this point, can we have .replace("-", "_") in .id ?

if self.tag_format:
return self.tag_format.format(spec_hash=tag)
return tag

def image_name(self) -> str:
"""Full image name with tag."""
image_name = self._image_name()
image_name = f"{self.name}:{self.tag}"
if self.registry:
image_name = f"{self.registry}/{image_name}"
try:
return ImageBuildEngine._IMAGE_NAME_TO_REAL_NAME[image_name]
except KeyError:
return image_name

def _image_name(self) -> str:
"""Construct full image name with tag."""
tag = calculate_hash_from_image_spec(self)
if self.tag_format:
tag = self.tag_format.format(spec_hash=tag)

container_image = f"{self.name}:{tag}"
if self.registry:
container_image = f"{self.registry}/{container_image}"
return container_image

def is_container(self) -> bool:
"""
Check if the current container image in the pod is built from current image spec.
:return: True if the current container image in the pod is built from current image spec, False otherwise.
"""
from flytekit.core.context_manager import ExecutionState, FlyteContextManager

state = FlyteContextManager.current_context().execution_state
if state and state.mode and state.mode != ExecutionState.Mode.LOCAL_WORKFLOW_EXECUTION:
return os.environ.get(_F_IMG_ID) == self.image_name()
return os.environ.get(_F_IMG_ID) == self.id
return True

def exist(self) -> Optional[bool]:
Expand Down Expand Up @@ -153,17 +207,18 @@ def exist(self) -> Optional[bool]:
except ImageNotFound:
return False
except Exception as e:
tag = calculate_hash_from_image_spec(self)
# if docker engine is not running locally, use requests to check if the image exists.
if "localhost:" in self.registry:
if self.registry is None:
container_registry = None
elif "localhost:" in self.registry:
container_registry = self.registry
elif self.registry and "/" in self.registry:
elif "/" in self.registry:
container_registry = self.registry.split("/")[0]
else:
# Assume the image is in docker hub if users don't specify a registry, such as ghcr.io, docker.io.
container_registry = DOCKER_HUB
if container_registry == DOCKER_HUB:
url = f"https://hub.docker.com/v2/repositories/{self.registry}/{self.name}/tags/{tag}"
url = f"https://hub.docker.com/v2/repositories/{self.registry}/{self.name}/tags/{self.tag}"
response = requests.get(url)
if response.status_code == 200:
return True
Expand All @@ -184,62 +239,47 @@ def exist(self) -> Optional[bool]:
click.secho(f"Failed to check if the image exists with error:\n {e}", fg="red")
return None

def __hash__(self):
return hash(asdict(self).__str__())
def _update_attribute(self, attr_name: str, values: Union[str, List[str]]) -> "ImageSpec":
"""
Generic method to update a specified list attribute, either appending or extending.
"""
current_value = copy.deepcopy(getattr(self, attr_name)) or []

if isinstance(values, str):
current_value.append(values)
elif isinstance(values, list):
current_value.extend(values)

return dataclasses.replace(self, **{attr_name: current_value})

def with_commands(self, commands: Union[str, List[str]]) -> "ImageSpec":
"""
Builder that returns a new image spec with an additional list of commands that will be executed during the building process.
"""
new_image_spec = copy.deepcopy(self)
if new_image_spec.commands is None:
new_image_spec.commands = []

if isinstance(commands, List):
new_image_spec.commands.extend(commands)
else:
new_image_spec.commands.append(commands)

return new_image_spec
return self._update_attribute("commands", commands)

def with_packages(self, packages: Union[str, List[str]]) -> "ImageSpec":
"""
Builder that returns a new image speck with additional python packages that will be installed during the building process.
"""
new_image_spec = copy.deepcopy(self)
if new_image_spec.packages is None:
new_image_spec.packages = []

if isinstance(packages, List):
new_image_spec.packages.extend(packages)
else:
new_image_spec.packages.append(packages)

new_image_spec = self._update_attribute("packages", packages)
return new_image_spec

def with_apt_packages(self, apt_packages: Union[str, List[str]]) -> "ImageSpec":
"""
Builder that returns a new image spec with additional list of apt packages that will be executed during the building process.
Builder that returns a new image spec with an additional list of apt packages that will be executed during the building process.
"""
new_image_spec = copy.deepcopy(self)
if new_image_spec.apt_packages is None:
new_image_spec.apt_packages = []

if isinstance(apt_packages, List):
new_image_spec.apt_packages.extend(apt_packages)
else:
new_image_spec.apt_packages.append(apt_packages)

new_image_spec = self._update_attribute("apt_packages", apt_packages)
return new_image_spec

def force_push(self) -> "ImageSpec":
"""
Builder that returns a new image spec with force push enabled.
"""
new_image_spec = copy.deepcopy(self)
new_image_spec._is_force_push = True
copied_image_spec = copy.deepcopy(self)
copied_image_spec._is_force_push = True

return new_image_spec
return copied_image_spec


class ImageSpecBuilder:
Expand Down Expand Up @@ -306,18 +346,23 @@ def build(cls, image_spec: ImageSpec):
if execution_mode is not None:
return

if isinstance(image_spec.base_image, ImageSpec):
cls.build(image_spec.base_image)
image_spec.base_image = image_spec.base_image.image_name()
spec = copy.deepcopy(image_spec)

if image_spec.builder is None and cls._REGISTRY:
if isinstance(spec.base_image, ImageSpec):
cls.build(spec.base_image)
spec.base_image = spec.base_image.image_name()

if spec.builder is None and cls._REGISTRY:
builder = max(cls._REGISTRY, key=lambda name: cls._REGISTRY[name][1])
else:
builder = image_spec.builder
builder = spec.builder

img_name = image_spec.image_name()
if cls._get_builder(builder).should_build(image_spec):
cls._build_image(builder, image_spec, img_name)
img_name = spec.image_name()
img_builder = cls._get_builder(builder)
if img_builder.should_build(spec):
fully_qualified_image_name = img_builder.build_image(spec)
if fully_qualified_image_name is not None:
cls._IMAGE_NAME_TO_REAL_NAME[img_name] = fully_qualified_image_name

@classmethod
def _get_builder(cls, builder: str) -> ImageSpecBuilder:
Expand All @@ -335,69 +380,3 @@ def _get_builder(cls, builder: str) -> ImageSpecBuilder:
f" Please upgrade envd to v0.3.39+."
)
return cls._REGISTRY[builder][0]

@classmethod
def _build_image(cls, builder: str, image_spec: ImageSpec, img_name: str):
fully_qualified_image_name = cls._get_builder(builder).build_image(image_spec)
if fully_qualified_image_name is not None:
cls._IMAGE_NAME_TO_REAL_NAME[img_name] = fully_qualified_image_name


@lru_cache
def _calculate_deduped_hash_from_image_spec(image_spec: ImageSpec):
"""
Calculate this special hash from the image spec,
and it used to identify the imageSpec in the ImageConfig in the serialization context.

ImageConfig:
- deduced hash 1: flyteorg/flytekit: 123
- deduced hash 2: flyteorg/flytekit: 456
"""
image_spec_bytes = asdict(image_spec).__str__().encode("utf-8")
# copy the image spec to avoid modifying the original image spec. otherwise, the hash will be different.
return base64.urlsafe_b64encode(hashlib.md5(image_spec_bytes).digest()).decode("ascii").rstrip("=")


@lru_cache
def calculate_hash_from_image_spec(image_spec: ImageSpec):
"""
Calculate the hash from the image spec.
"""
# copy the image spec to avoid modifying the original image spec. otherwise, the hash will be different.
spec = copy.deepcopy(image_spec)
if isinstance(spec.base_image, ImageSpec):
spec.base_image = spec.base_image.image_name()

if image_spec.source_root:
from flytekit.tools.fast_registration import compute_digest
from flytekit.tools.ignore import DockerIgnore, GitIgnore, IgnoreGroup, StandardIgnore

ignore = IgnoreGroup(image_spec.source_root, [GitIgnore, DockerIgnore, StandardIgnore])
digest = compute_digest(image_spec.source_root, ignore.is_ignored)
spec.source_root = digest

if spec.requirements:
spec.requirements = hashlib.sha1(pathlib.Path(spec.requirements).read_bytes()).hexdigest()
# won't rebuild the image if we change the registry_config path
spec.registry_config = None
image_spec_bytes = asdict(spec).__str__().encode("utf-8")
tag = base64.urlsafe_b64encode(hashlib.md5(image_spec_bytes).digest()).decode("ascii").rstrip("=")
# replace "-" with "_" to make it a valid tag
return tag.replace("-", "_")


def hash_directory(path):
"""
Return the SHA-256 hash of the directory at the given path.
"""
hasher = hashlib.sha256()
for root, dirs, files in os.walk(path):
for file in files:
with open(os.path.join(root, file), "rb") as f:
while True:
# Read file in small chunks to avoid loading large files into memory all at once
chunk = f.read(4096)
if not chunk:
break
hasher.update(chunk)
return bytes(hasher.hexdigest(), "utf-8")
5 changes: 1 addition & 4 deletions flytekit/tools/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from flytekit.core.task import ReferenceTask
from flytekit.core.utils import ClassDecorator, _dnsify
from flytekit.core.workflow import ReferenceWorkflow, WorkflowBase
from flytekit.image_spec.image_spec import _calculate_deduped_hash_from_image_spec
from flytekit.models import common as _common_models
from flytekit.models import common as common_models
from flytekit.models import interface as interface_models
Expand Down Expand Up @@ -188,9 +187,7 @@ def get_serializable_task(
if settings.image_config.images is None:
settings.image_config = ImageConfig.create_from(settings.image_config.default_image)
settings.image_config.images.append(
Image.look_up_image_info(
_calculate_deduped_hash_from_image_spec(e.container_image), e.get_image(settings)
)
Image.look_up_image_info(e.container_image.id, e.get_image(settings))
)

# In case of Dynamic tasks, we want to pass the serialization context, so that they can reconstruct the state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def create_envd_config(image_spec: ImageSpec) -> str:
run_commands = _create_str_from_package_list(image_spec.commands)
conda_channels = _create_str_from_package_list(image_spec.conda_channels)
apt_packages = _create_str_from_package_list(image_spec.apt_packages)
env = {"PYTHONPATH": "/root:", _F_IMG_ID: image_spec.image_name()}
env = {"PYTHONPATH": "/root:", _F_IMG_ID: image_spec.id}

if image_spec.env:
env.update(image_spec.env)
Expand Down
Loading
Loading