Skip to content

Commit

Permalink
Add kubernetes labels (#1236)
Browse files Browse the repository at this point in the history
* Add kubernetes labels

* Add labels to cli

* Empty labels fix

* Fix labels in decorator

* Allow dictionaries in decorator
* Convert strings to dictionaries
* Make parse node selector function more generic

* Add argo labels

* Clean kubernetes labels

* Hash original string and use shorter hash

* Fix command join

* Add rename parse list and add tests for value cleaning

* override spec parser

* Don't reencode json objects

* Strip quotes

* Fix rebase error

* Throw exception for invalid labels

* Changes based on PR comments

* Fix invalid label error message

* Add code comment to kube validation function

* Add tests and fix bad json

* Fix pre-commit error

* Remove f-strings :(

* More python 3.5 fixes
  • Loading branch information
dhpollack authored Apr 17, 2023
1 parent e148cd8 commit a992dde
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 19 deletions.
2 changes: 2 additions & 0 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@
KUBERNETES_NODE_SELECTOR = from_conf("KUBERNETES_NODE_SELECTOR", "")
KUBERNETES_TOLERATIONS = from_conf("KUBERNETES_TOLERATIONS", "")
KUBERNETES_SECRETS = from_conf("KUBERNETES_SECRETS", "")
# Default labels for kubernetes pods
KUBERNETES_LABELS = from_conf("KUBERNETES_LABELS", "")
# Default GPU vendor to use by K8S jobs created by Metaflow (supports nvidia, amd)
KUBERNETES_GPU_VENDOR = from_conf("KUBERNETES_GPU_VENDOR", "nvidia")
# Default container image for K8S
Expand Down
6 changes: 4 additions & 2 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -887,20 +887,22 @@ def _container_templates(self):
.retry_strategy(
times=total_retries,
minutes_between_retries=minutes_between_retries,
)
.metadata(
).metadata(
ObjectMeta().annotation("metaflow/step_name", node.name)
# Unfortunately, we can't set the task_id since it is generated
# inside the pod. However, it can be inferred from the annotation
# set by argo-workflows - `workflows.argoproj.io/outputs` - refer
# the field 'task-id' in 'parameters'
# .annotation("metaflow/task_id", ...)
.annotation("metaflow/attempt", retry_count)
# Set labels
.labels(resources.get("labels"))
)
# Set emptyDir volume for state management
.empty_dir_volume("out")
# Set node selectors
.node_selectors(resources.get("node_selector"))
# Set tolerations
.tolerations(resources.get("tolerations"))
# Set container
.container(
Expand Down
6 changes: 5 additions & 1 deletion metaflow/plugins/kubernetes/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from metaflow.metaflow_config import (
SERVICE_HEADERS,
SERVICE_INTERNAL_URL,
CARD_AZUREROOT,
CARD_GSROOT,
CARD_S3ROOT,
DATASTORE_SYSROOT_S3,
DATATOOLS_S3ROOT,
Expand All @@ -29,8 +31,8 @@
BASH_SAVE_LOGS,
bash_capture_logs,
export_mflog_env_vars,
tail_logs,
get_log_tailer,
tail_logs,
)

from .kubernetes_client import KubernetesClient
Expand Down Expand Up @@ -152,6 +154,7 @@ def create_job(
run_time_limit=None,
env=None,
tolerations=None,
labels=None,
):

if env is None:
Expand Down Expand Up @@ -185,6 +188,7 @@ def create_job(
retries=0,
step_name=step_name,
tolerations=tolerations,
labels=labels,
)
.environment_variable("METAFLOW_CODE_SHA", code_package_sha)
.environment_variable("METAFLOW_CODE_URL", code_package_url)
Expand Down
17 changes: 15 additions & 2 deletions metaflow/plugins/kubernetes/kubernetes_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time
import traceback

from metaflow import util, JSONTypeClass
from metaflow import JSONTypeClass, util
from metaflow._vendor import click
from metaflow.exception import METAFLOW_EXIT_DISALLOW_RETRY, CommandException
from metaflow.metadata.util import sync_local_metadata_from_datastore
Expand Down Expand Up @@ -91,6 +91,12 @@ def kubernetes():
type=JSONTypeClass(),
multiple=False,
)
@click.option(
"--labels",
multiple=True,
default=None,
help="Labels for Kubernetes pod.",
)
@click.pass_context
def step(
ctx,
Expand All @@ -110,6 +116,7 @@ def step(
gpu_vendor=None,
run_time_limit=None,
tolerations=None,
labels=None,
**kwargs
):
def echo(msg, stream="stderr", job_id=None, **kwargs):
Expand Down Expand Up @@ -175,7 +182,12 @@ def echo(msg, stream="stderr", job_id=None, **kwargs):
stderr_location = ds.get_log_location(TASK_LOG_SOURCE, "stderr")

# `node_selector` is a tuple of strings, convert it to a dictionary
node_selector = KubernetesDecorator.parse_node_selector(node_selector)
node_selector = KubernetesDecorator.parse_kube_keyvalue_list(node_selector)

# `labels` is a tuple of strings or a tuple with a single comma separated string
# convert it to a dict
labels = KubernetesDecorator.parse_kube_keyvalue_list(labels, False)
KubernetesDecorator.validate_kube_labels(labels)

def _sync_metadata():
if ctx.obj.metadata.TYPE == "local":
Expand Down Expand Up @@ -218,6 +230,7 @@ def _sync_metadata():
run_time_limit=run_time_limit,
env=env,
tolerations=tolerations,
labels=labels,
)
except Exception as e:
traceback.print_exc(chain=False)
Expand Down
110 changes: 96 additions & 14 deletions metaflow/plugins/kubernetes/kubernetes_decorator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import hashlib
import json
import os
import platform
import re
import sys
from typing import Dict, List, Optional, Union

from metaflow.decorators import StepDecorator
from metaflow.exception import MetaflowException
Expand All @@ -12,11 +15,12 @@
KUBERNETES_CONTAINER_IMAGE,
KUBERNETES_CONTAINER_REGISTRY,
KUBERNETES_GPU_VENDOR,
KUBERNETES_LABELS,
KUBERNETES_NAMESPACE,
KUBERNETES_NODE_SELECTOR,
KUBERNETES_TOLERATIONS,
KUBERNETES_SERVICE_ACCOUNT,
KUBERNETES_SECRETS,
KUBERNETES_SERVICE_ACCOUNT,
KUBERNETES_FETCH_EC2_METADATA,
)
from metaflow.plugins.resources_decorator import ResourcesDecorator
Expand Down Expand Up @@ -65,6 +69,8 @@ class KubernetesDecorator(StepDecorator):
in Metaflow configuration.
tolerations : List[str], default: METAFLOW_KUBERNETES_TOLERATIONS
Kubernetes tolerations to use when launching pod in Kubernetes.
labels : Dict[str, str], default: METAFLOW_KUBERNETES_LABELS
Kubernetes labels to use when launching pod in Kubernetes.
"""

name = "kubernetes"
Expand All @@ -76,6 +82,7 @@ class KubernetesDecorator(StepDecorator):
"service_account": None,
"secrets": None, # e.g., mysecret
"node_selector": None, # e.g., kubernetes.io/os=linux
"labels": None, # e.g., my_label=my_value
"namespace": None,
"gpu": None, # value of 0 implies that the scheduled node should not have GPUs
"gpu_vendor": None,
Expand All @@ -99,9 +106,17 @@ def __init__(self, attributes=None, statically_defined=False):
self.attributes["node_selector"] = KUBERNETES_NODE_SELECTOR
if not self.attributes["tolerations"] and KUBERNETES_TOLERATIONS:
self.attributes["tolerations"] = json.loads(KUBERNETES_TOLERATIONS)
if not self.attributes["labels"] and KUBERNETES_LABELS:
self.attributes["labels"] = KUBERNETES_LABELS

if isinstance(self.attributes["labels"], str):
self.attributes["labels"] = self.parse_kube_keyvalue_list(
self.attributes["labels"].split(","), False
)
self.validate_kube_labels(self.attributes["labels"])

if isinstance(self.attributes["node_selector"], str):
self.attributes["node_selector"] = self.parse_node_selector(
self.attributes["node_selector"] = self.parse_kube_keyvalue_list(
self.attributes["node_selector"].split(",")
)

Expand Down Expand Up @@ -280,10 +295,11 @@ def runtime_step_cli(
for k, v in self.attributes.items():
if k == "namespace":
cli_args.command_options["k8s_namespace"] = v
elif k == "node_selector" and v:
cli_args.command_options[k] = ",".join(
["=".join([key, str(val)]) for key, val in v.items()]
)
elif k in {"node_selector", "labels"} and v:
cli_args.command_options[k] = [
"=".join([key, str(val)]) if val else key
for key, val in v.items()
]
elif k == "tolerations":
cli_args.command_options[k] = json.dumps(v)
else:
Expand Down Expand Up @@ -391,14 +407,80 @@ def _save_package_once(cls, flow_datastore, package):
[package.blob], len_hint=1
)[0]

@classmethod
def _parse_decorator_spec(cls, deco_spec: str):
if not deco_spec:
return cls()

valid_options = "|".join(cls.defaults.keys())
deco_spec_parts = []
for part in re.split(""",(?=[\s\w]+[{}]=)""".format(valid_options), deco_spec):
name, val = part.split("=", 1)
if name in {"labels", "node_selector"}:
try:
tmp_vals = json.loads(val.strip().replace('\\"', '"'))
for val_i in tmp_vals.values():
if not (val_i is None or isinstance(val_i, str)):
raise KubernetesException(
"All values must be string or null."
)
except json.JSONDecodeError:
if val.startswith("{"):
raise KubernetesException(
"Malform json detected in %s" % str(val)
)
both = name == "node_selector"
val = json.dumps(
cls.parse_kube_keyvalue_list(val.split(","), both),
separators=(",", ":"),
)
deco_spec_parts.append("=".join([name, val]))
deco_spec_parsed = ",".join(deco_spec_parts)
return super()._parse_decorator_spec(deco_spec_parsed)

@staticmethod
def parse_node_selector(node_selector: list):
def parse_kube_keyvalue_list(items: List[str], requires_both: bool = True):
try:
return {
str(k.split("=", 1)[0]): str(k.split("=", 1)[1])
for k in node_selector or []
}
ret = {}
for item_str in items:
item = item_str.split("=", 1)
if requires_both:
item[1] # raise IndexError
if str(item[0]) in ret:
raise KubernetesException("Duplicate key found: %s" % str(item[0]))
ret[str(item[0])] = str(item[1]) if len(item) > 1 else None
return ret
except KubernetesException as e:
raise e
except (AttributeError, IndexError):
raise KubernetesException(
"Unable to parse node_selector: %s" % node_selector
)
raise KubernetesException("Unable to parse kubernetes list: %s" % items)

@staticmethod
def validate_kube_labels(
labels: Optional[Dict[str, Optional[str]]],
) -> bool:
"""Validate label values.
This validates the kubernetes label values. It does not validate the keys.
Ideally, keys should be static and also the validation rules for keys are
more complex than those for values. For full validation rules, see:
https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set
"""

def validate_label(s: Optional[str]):
regex_match = r"^(([A-Za-z0-9][-A-Za-z0-9_.]{0,61})?[A-Za-z0-9])?$"
if not s:
# allow empty label
return True
if not re.search(regex_match, s):
raise KubernetesException(
'Invalid value: "%s"\n'
"A valid label must be an empty string or one that\n"
" - Consist of alphanumeric, '-', '_' or '.' characters\n"
" - Begins and ends with an alphanumeric character\n"
" - Is at most 63 characters" % s
)
return True

return all([validate_label(v) for v in labels.values()]) if labels else True
94 changes: 94 additions & 0 deletions test/unit/test_kubernetes_decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import pytest

from metaflow.plugins.kubernetes.kubernetes import KubernetesException
from metaflow.plugins.kubernetes.kubernetes_decorator import KubernetesDecorator


@pytest.mark.parametrize(
"labels",
[
None,
{"label": "value"},
{"label1": "val1", "label2": "val2"},
{"label1": "val1", "label2": None},
{"label": "a"},
{"label": ""},
{
"label": (
"1234567890"
"1234567890"
"1234567890"
"1234567890"
"1234567890"
"1234567890"
"123"
)
},
{
"label": (
"1234567890"
"1234567890"
"1234-_.890"
"1234567890"
"1234567890"
"1234567890"
"123"
)
},
],
)
def test_kubernetes_decorator_validate_kube_labels(labels):
assert KubernetesDecorator.validate_kube_labels(labels)


@pytest.mark.parametrize(
"labels",
[
{"label": "a-"},
{"label": ".a"},
{"label": "test()"},
{
"label": (
"1234567890"
"1234567890"
"1234567890"
"1234567890"
"1234567890"
"1234567890"
"1234"
)
},
{"label": "(){}??"},
{"valid": "test", "invalid": "bißchen"},
],
)
def test_kubernetes_decorator_validate_kube_labels_fail(labels):
"""Fail if label contains invalid characters or is too long"""
with pytest.raises(KubernetesException):
KubernetesDecorator.validate_kube_labels(labels)


@pytest.mark.parametrize(
"items,requires_both,expected",
[
(["key=value"], True, {"key": "value"}),
(["key=value"], False, {"key": "value"}),
(["key"], False, {"key": None}),
(["key=value", "key2=value2"], True, {"key": "value", "key2": "value2"}),
],
)
def test_kubernetes_parse_keyvalue_list(items, requires_both, expected):
ret = KubernetesDecorator.parse_kube_keyvalue_list(items, requires_both)
assert ret == expected


@pytest.mark.parametrize(
"items,requires_both",
[
(["key=value", "key=value2"], True),
(["key"], True),
],
)
def test_kubernetes_parse_keyvalue_list(items, requires_both):
with pytest.raises(KubernetesException):
KubernetesDecorator.parse_kube_keyvalue_list(items, requires_both)

0 comments on commit a992dde

Please sign in to comment.