diff --git a/metaflow/plugins/kubernetes/kubernetes_cli.py b/metaflow/plugins/kubernetes/kubernetes_cli.py index 823fd3a27ba..824132ca568 100644 --- a/metaflow/plugins/kubernetes/kubernetes_cli.py +++ b/metaflow/plugins/kubernetes/kubernetes_cli.py @@ -186,8 +186,10 @@ def echo(msg, stream="stderr", job_id=None): # `labels` is a tuple of strings or a tuple with a single comma separated string # convert it to a dict - labels = KubernetesDecorator.parse_kube_list( - [l for l_tmp in labels for l in l_tmp.split(",")], False + labels = KubernetesDecorator.clean_kube_labels( + KubernetesDecorator.parse_kube_list( + [l for l_tmp in labels for l in l_tmp.split(",")], False + ) ) def _sync_metadata(): diff --git a/metaflow/plugins/kubernetes/kubernetes_decorator.py b/metaflow/plugins/kubernetes/kubernetes_decorator.py index f79fd19a2f5..8f5f8b5a4d3 100644 --- a/metaflow/plugins/kubernetes/kubernetes_decorator.py +++ b/metaflow/plugins/kubernetes/kubernetes_decorator.py @@ -1,8 +1,10 @@ +import hashlib import json import os import platform +import re import sys -from typing import List, Union +from typing import Dict, List, Optional, Union from metaflow.decorators import StepDecorator from metaflow.exception import MetaflowException @@ -116,6 +118,7 @@ def __init__(self, attributes=None, statically_defined=False): self.attributes["labels"] = self.parse_kube_list( self.attributes["labels"].split(","), False ) + self.attributes["labels"] = self.clean_kube_labels(self.attributes["labels"]) if isinstance(self.attributes["node_selector"], str): self.attributes["node_selector"] = self.parse_kube_list( @@ -411,3 +414,25 @@ def parse_kube_list(items: Union[str, List[str]], requires_both: bool = True): return ret except (AttributeError, IndexError): raise KubernetesException("Unable to parse kubernetes list: %s" % items) + + @staticmethod + def clean_kube_labels( + labels: Optional[Dict[str, Optional[str]]], + max_len: int = 63, + regex_sub: str = r"^[^a-z0-9A-Z]*|[^a-zA-Z0-9_\-\.]|[^a-z0-9A-Z]*$", + ): + """Inspired by apache airflow label cleaner.""" + + def clean_label(s: Optional[str]): + if not s: + # allow empty label + return s + s_clean = re.sub(regex_sub, "", s) + if len(s_clean) > max_len or s != s_clean: + clean_hash = ( + hashlib.blake2b(s_clean.encode(), digest_size=9).hexdigest() + ) + s_clean = f"{s_clean[: max_len - len(clean_hash) - 1]}-{clean_hash}" + return s_clean + + return {k: clean_label(v) for k, v in labels.items()} if labels else labels