Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MemVerge MMCloud Agent #1821

Merged
merged 10 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions plugins/flytekit-mmcloud/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Flytekit Memory Machine Cloud Plugin

Flyte Agent plugin to allow executing Flyte tasks using MemVerge Memory Machine Cloud.

To install the plugin, run the following command:

```bash
pip install flytekitplugins-mmcloud
```

To get started with MMCloud, refer to the [MMCloud User Guide](https://docs.memverge.com/mmce/current/userguide/olh/index.html).

## Getting Started

This plugin allows executing `PythonFunctionTask` using MMCloud without changing any function code.

[Resource](https://docs.flyte.org/projects/cookbook/en/latest/auto_examples/productionizing/customizing_resources.html) (cpu and mem) requests and limits, [container](https://docs.flyte.org/projects/cookbook/en/latest/auto_examples/customizing_dependencies/multi_images.html) images, and [environment](https://docs.flyte.org/projects/flytekit/en/latest/generated/flytekit.task.html) variable specifications are supported.

[ImageSpec](https://docs.flyte.org/projects/cookbook/en/latest/auto_examples/customizing_dependencies/image_spec.html) may be used to define images to run tasks.

### Credentials

The following [secrets](https://docs.flyte.org/projects/cookbook/en/latest/auto_examples/productionizing/use_secrets.html) are required to be defined for the agent server:
* `mmc_address`: MMCloud OpCenter address
* `mmc_username`: MMCloud OpCenter username
* `mmc_password`: MMCloud OpCenter password

### Defaults

Compute resources:
* If only requests are specified, there are no limits.
* If only limits are specified, the requests are equal to the limits.
* If neither resource requests nor limits are specified, the default requests used for job submission are `cpu="1"` and `mem="1Gi"`, and there are no limits.

### Example

`example.py` workflow example:
```python
import pandas as pd
from flytekit import ImageSpec, Resources, task, workflow
from sklearn.datasets import load_wine
from sklearn.linear_model import LogisticRegression

from flytekitplugins.mmcloud import MMCloudConfig

image_spec = ImageSpec(packages=["scikit-learn"], registry="docker.io/memverge")


@task
def get_data() -> pd.DataFrame:
"""Get the wine dataset."""
return load_wine(as_frame=True).frame


@task(task_config=MMCloudConfig(), container_image=image_spec) # Task will be submitted as MMCloud job
def process_data(data: pd.DataFrame) -> pd.DataFrame:
"""Simplify the task from a 3-class to a binary classification problem."""
return data.assign(target=lambda x: x["target"].where(x["target"] == 0, 1))


@task(
task_config=MMCloudConfig(submit_extra="--migratePolicy [enable=true]"),
requests=Resources(cpu="1", mem="1Gi"),
limits=Resources(cpu="2", mem="4Gi"),
container_image=image_spec,
environment={"KEY": "value"},
)
def train_model(data: pd.DataFrame, hyperparameters: dict) -> LogisticRegression:
"""Train a model on the wine dataset."""
features = data.drop("target", axis="columns")
target = data["target"]
return LogisticRegression(max_iter=3000, **hyperparameters).fit(features, target)


@workflow
def training_workflow(hyperparameters: dict) -> LogisticRegression:
"""Put all of the steps together into a single workflow."""
data = get_data()
processed_data = process_data(data=data)
return train_model(
data=processed_data,
hyperparameters=hyperparameters,
)
```

### Agent Image

Install `flytekitplugins-mmcloud` in the agent image.

A `float` binary (obtainable via the OpCenter) is required. Copy it to the agent image `PATH`.

Sample `Dockerfile` for building an agent image:
```dockerfile
FROM python:3.11-slim-bookworm

WORKDIR /root
ENV PYTHONPATH /root

# flytekit will autoload the agent if package is installed.
RUN pip install flytekitplugins-mmcloud
COPY float /usr/local/bin/float

CMD pyflyte serve --port 8000
```
16 changes: 16 additions & 0 deletions plugins/flytekit-mmcloud/flytekitplugins/mmcloud/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""
.. currentmodule:: flytekitplugins.mmcloud

This package contains things that are useful when extending Flytekit.

.. autosummary::
:template: custom.rst
:toctree: generated/

MMCloudConfig
MMCloudTask
MMCloudAgent
"""

from .agent import MMCloudAgent
from .task import MMCloudConfig, MMCloudTask
212 changes: 212 additions & 0 deletions plugins/flytekit-mmcloud/flytekitplugins/mmcloud/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
import json
import shlex
import subprocess
from dataclasses import asdict, dataclass
from tempfile import NamedTemporaryFile
from typing import Optional

import grpc
from flyteidl.admin.agent_pb2 import CreateTaskResponse, DeleteTaskResponse, GetTaskResponse, Resource
from flytekitplugins.mmcloud.utils import async_check_output, mmcloud_status_to_flyte_state

from flytekit import current_context
from flytekit.extend.backend.base_agent import AgentBase, AgentRegistry
from flytekit.loggers import logger
from flytekit.models.literals import LiteralMap
from flytekit.models.task import TaskTemplate


@dataclass
class Metadata:
job_id: str


class MMCloudAgent(AgentBase):
def __init__(self):
super().__init__(task_type="mmcloud_task")
self._response_format = ["--format", "json"]

async def async_login(self):
"""
Log in to Memory Machine Cloud OpCenter.
"""
try:
# If already logged in, this will reset the session timer
login_info_command = ["float", "login", "--info"]
await async_check_output(*login_info_command)
except subprocess.CalledProcessError:
logger.info("Attempting to log in to OpCenter")
try:
secrets = current_context().secrets
login_command = [
"float",
"login",
"--address",
secrets.get("mmc_address"),
"--username",
secrets.get("mmc_username"),
"--password",
secrets.get("mmc_password"),
]
await async_check_output(*login_command)
except subprocess.CalledProcessError:
logger.exception("Failed to log in to OpCenter")
raise

logger.info("Logged in to OpCenter")

async def async_create(
self,
context: grpc.ServicerContext,
output_prefix: str,
task_template: TaskTemplate,
inputs: Optional[LiteralMap] = None,
) -> CreateTaskResponse:
"""
Submit Flyte task as MMCloud job to the OpCenter, and return the job UID for the task.
"""
submit_command = [
"float",
"submit",
"--force",
*self._response_format,
]

# We do not use container.resources because FlytePropeller will impose limits that should not apply to MMCloud
min_cpu, min_mem, max_cpu, max_mem = task_template.custom["resources"]
submit_command.extend(["--cpu", f"{min_cpu}:{max_cpu}"] if max_cpu else ["--cpu", f"{min_cpu}"])
submit_command.extend(["--mem", f"{min_mem}:{max_mem}"] if max_mem else ["--mem", f"{min_mem}"])

container = task_template.container

image = container.image
submit_command.extend(["--image", image])

env = container.env
for key, value in env.items():
submit_command.extend(["--env", f"{key}={value}"])

submit_extra = task_template.custom["submit_extra"]
submit_command.extend(shlex.split(submit_extra))

args = task_template.container.args
script_lines = ["#!/bin/bash\n", f"{shlex.join(args)}\n"]

task_id = task_template.id
try:
# float binary takes a job file as input, so one must be created
# Use a uniquely named temporary file to avoid race conditions and clutter
with NamedTemporaryFile(mode="w") as job_file:
job_file.writelines(script_lines)
# Flush immediately so that the job file is usable
job_file.flush()
logger.debug("Wrote job script")

submit_command.extend(["--job", job_file.name])

logger.info(f"Attempting to submit Flyte task {task_id} as MMCloud job")
logger.debug(f"With command: {submit_command}")
try:
await self.async_login()
submit_response = await async_check_output(*submit_command)
submit_response = json.loads(submit_response.decode())
job_id = submit_response["id"]
except subprocess.CalledProcessError as e:
logger.exception(
f"Failed to submit Flyte task {task_id} as MMCloud job\n"
f"[stdout] {e.stdout.decode()}\n"
f"[stderr] {e.stderr.decode()}\n"
)
raise
except (UnicodeError, json.JSONDecodeError):
logger.exception(f"Failed to decode submit response for Flyte task: {task_id}")
raise
except KeyError:
logger.exception(f"Failed to obtain MMCloud job id for Flyte task: {task_id}")
raise

logger.info(f"Submitted Flyte task {task_id} as MMCloud job {job_id}")
logger.debug(f"OpCenter response: {submit_response}")
except OSError:
logger.exception("Cannot open job script for writing")
raise

metadata = Metadata(job_id=job_id)

return CreateTaskResponse(resource_meta=json.dumps(asdict(metadata)).encode("utf-8"))

async def async_get(self, context: grpc.ServicerContext, resource_meta: bytes) -> GetTaskResponse:
"""
Return the status of the task, and return the outputs on success.
"""
metadata = Metadata(**json.loads(resource_meta.decode("utf-8")))
job_id = metadata.job_id

show_command = [
"float",
"show",
*self._response_format,
"--job",
job_id,
]

logger.info(f"Attempting to obtain status for MMCloud job {job_id}")
logger.debug(f"With command: {show_command}")
try:
await self.async_login()
show_response = await async_check_output(*show_command)
show_response = json.loads(show_response.decode())
job_status = show_response["status"]
except subprocess.CalledProcessError as e:
logger.exception(
f"Failed to get show response for MMCloud job: {job_id}\n"
f"[stdout] {e.stdout.decode()}\n"
f"[stderr] {e.stderr.decode()}\n"
)
raise
except (UnicodeError, json.JSONDecodeError):
logger.exception(f"Failed to decode show response for MMCloud job: {job_id}")
raise
except KeyError:
logger.exception(f"Failed to obtain status for MMCloud job: {job_id}")
raise

task_state = mmcloud_status_to_flyte_state(job_status)

logger.info(f"Obtained status for MMCloud job {job_id}: {job_status}")
logger.debug(f"OpCenter response: {show_response}")

return GetTaskResponse(resource=Resource(state=task_state))

async def async_delete(self, context: grpc.ServicerContext, resource_meta: bytes) -> DeleteTaskResponse:
"""
Delete the task. This call should be idempotent.
"""
metadata = Metadata(**json.loads(resource_meta.decode("utf-8")))
job_id = metadata.job_id

cancel_command = [
"float",
"cancel",
"--force",
"--job",
job_id,
]

logger.info(f"Attempting to cancel MMCloud job {job_id}")
logger.debug(f"With command: {cancel_command}")
try:
await self.async_login()
await async_check_output(*cancel_command)
except subprocess.CalledProcessError as e:
logger.exception(
f"Failed to cancel MMCloud job: {job_id}\n[stdout] {e.stdout.decode()}\n[stderr] {e.stderr.decode()}\n"
)
raise

logger.info(f"Submitted cancel request for MMCloud job: {job_id}")

return DeleteTaskResponse()


AgentRegistry.register(MMCloudAgent())
64 changes: 64 additions & 0 deletions plugins/flytekit-mmcloud/flytekitplugins/mmcloud/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any, Dict, Optional, Union

from flytekitplugins.mmcloud.utils import flyte_to_mmcloud_resources
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Struct

from flytekit.configuration import SerializationSettings
from flytekit.core.python_function_task import PythonFunctionTask
from flytekit.core.resources import Resources
from flytekit.extend import TaskPlugins
from flytekit.image_spec.image_spec import ImageSpec


@dataclass
class MMCloudConfig(object):
"""
Configures MMCloudTask. Tasks specified with MMCloudConfig will be executed using Memory Machine Cloud.
"""

# This allows the user to specify additional arguments for the float submit command
submit_extra: str = ""


class MMCloudTask(PythonFunctionTask):
_TASK_TYPE = "mmcloud_task"

def __init__(
self,
task_config: Optional[MMCloudConfig],
task_function: Callable,
container_image: Optional[Union[str, ImageSpec]] = None,
requests: Optional[Resources] = None,
limits: Optional[Resources] = None,
**kwargs,
):
super().__init__(
task_config=task_config or MMCloudConfig(),
task_type=self._TASK_TYPE,
task_function=task_function,
container_image=container_image,
**kwargs,
)

self._mmcloud_resources = flyte_to_mmcloud_resources(requests=requests, limits=limits)

def execute(self, **kwargs) -> Any:
return PythonFunctionTask.execute(self, **kwargs)

def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]:
"""
Return plugin-specific data as a serializable dictionary.
"""
config = {
"submit_extra": self.task_config.submit_extra,
"resources": [str(resource) if resource else None for resource in self._mmcloud_resources],
}
s = Struct()
s.update(config)
return json_format.MessageToDict(s)


TaskPlugins.register_pythontask_plugin(MMCloudConfig, MMCloudTask)
Loading
Loading