Skip to content
This repository has been archived by the owner on Jul 19, 2024. It is now read-only.

Commit

Permalink
MemVerge MMCloud Agent (flyteorg#1821)
Browse files Browse the repository at this point in the history
Signed-off-by: Edwin Yu <edwinyyyu@gmail.com>
Signed-off-by: Future Outlier <eric901201@gmai.com>
  • Loading branch information
edwinyyyu authored and Future Outlier committed Oct 3, 2023
1 parent 8197069 commit 7f5e657
Show file tree
Hide file tree
Showing 8 changed files with 757 additions and 0 deletions.
104 changes: 104 additions & 0 deletions plugins/flytekit-mmcloud/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Flytekit Memory Machine Cloud Plugin

Flyte Agent plugin to allow executing Flyte tasks using MemVerge Memory Machine Cloud.

To install the plugin, run the following command:

```bash
pip install flytekitplugins-mmcloud
```

To get started with MMCloud, refer to the [MMCloud User Guide](https://docs.memverge.com/mmce/current/userguide/olh/index.html).

## Getting Started

This plugin allows executing `PythonFunctionTask` using MMCloud without changing any function code.

[Resource](https://docs.flyte.org/projects/cookbook/en/latest/auto_examples/productionizing/customizing_resources.html) (cpu and mem) requests and limits, [container](https://docs.flyte.org/projects/cookbook/en/latest/auto_examples/customizing_dependencies/multi_images.html) images, and [environment](https://docs.flyte.org/projects/flytekit/en/latest/generated/flytekit.task.html) variable specifications are supported.

[ImageSpec](https://docs.flyte.org/projects/cookbook/en/latest/auto_examples/customizing_dependencies/image_spec.html) may be used to define images to run tasks.

### Credentials

The following [secrets](https://docs.flyte.org/projects/cookbook/en/latest/auto_examples/productionizing/use_secrets.html) are required to be defined for the agent server:
* `mmc_address`: MMCloud OpCenter address
* `mmc_username`: MMCloud OpCenter username
* `mmc_password`: MMCloud OpCenter password

### Defaults

Compute resources:
* If only requests are specified, there are no limits.
* If only limits are specified, the requests are equal to the limits.
* If neither resource requests nor limits are specified, the default requests used for job submission are `cpu="1"` and `mem="1Gi"`, and there are no limits.

### Example

`example.py` workflow example:
```python
import pandas as pd
from flytekit import ImageSpec, Resources, task, workflow
from sklearn.datasets import load_wine
from sklearn.linear_model import LogisticRegression

from flytekitplugins.mmcloud import MMCloudConfig

image_spec = ImageSpec(packages=["scikit-learn"], registry="docker.io/memverge")


@task
def get_data() -> pd.DataFrame:
"""Get the wine dataset."""
return load_wine(as_frame=True).frame


@task(task_config=MMCloudConfig(), container_image=image_spec) # Task will be submitted as MMCloud job
def process_data(data: pd.DataFrame) -> pd.DataFrame:
"""Simplify the task from a 3-class to a binary classification problem."""
return data.assign(target=lambda x: x["target"].where(x["target"] == 0, 1))


@task(
task_config=MMCloudConfig(submit_extra="--migratePolicy [enable=true]"),
requests=Resources(cpu="1", mem="1Gi"),
limits=Resources(cpu="2", mem="4Gi"),
container_image=image_spec,
environment={"KEY": "value"},
)
def train_model(data: pd.DataFrame, hyperparameters: dict) -> LogisticRegression:
"""Train a model on the wine dataset."""
features = data.drop("target", axis="columns")
target = data["target"]
return LogisticRegression(max_iter=3000, **hyperparameters).fit(features, target)


@workflow
def training_workflow(hyperparameters: dict) -> LogisticRegression:
"""Put all of the steps together into a single workflow."""
data = get_data()
processed_data = process_data(data=data)
return train_model(
data=processed_data,
hyperparameters=hyperparameters,
)
```

### Agent Image

Install `flytekitplugins-mmcloud` in the agent image.

A `float` binary (obtainable via the OpCenter) is required. Copy it to the agent image `PATH`.

Sample `Dockerfile` for building an agent image:
```dockerfile
FROM python:3.11-slim-bookworm

WORKDIR /root
ENV PYTHONPATH /root

# flytekit will autoload the agent if package is installed.
RUN pip install flytekitplugins-mmcloud
COPY float /usr/local/bin/float

CMD pyflyte serve --port 8000
```
16 changes: 16 additions & 0 deletions plugins/flytekit-mmcloud/flytekitplugins/mmcloud/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""
.. currentmodule:: flytekitplugins.mmcloud
This package contains things that are useful when extending Flytekit.
.. autosummary::
:template: custom.rst
:toctree: generated/
MMCloudConfig
MMCloudTask
MMCloudAgent
"""

from .agent import MMCloudAgent
from .task import MMCloudConfig, MMCloudTask
212 changes: 212 additions & 0 deletions plugins/flytekit-mmcloud/flytekitplugins/mmcloud/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
import json
import shlex
import subprocess
from dataclasses import asdict, dataclass
from tempfile import NamedTemporaryFile
from typing import Optional

import grpc
from flyteidl.admin.agent_pb2 import CreateTaskResponse, DeleteTaskResponse, GetTaskResponse, Resource
from flytekitplugins.mmcloud.utils import async_check_output, mmcloud_status_to_flyte_state

from flytekit import current_context
from flytekit.extend.backend.base_agent import AgentBase, AgentRegistry
from flytekit.loggers import logger
from flytekit.models.literals import LiteralMap
from flytekit.models.task import TaskTemplate


@dataclass
class Metadata:
job_id: str


class MMCloudAgent(AgentBase):
def __init__(self):
super().__init__(task_type="mmcloud_task")
self._response_format = ["--format", "json"]

async def async_login(self):
"""
Log in to Memory Machine Cloud OpCenter.
"""
try:
# If already logged in, this will reset the session timer
login_info_command = ["float", "login", "--info"]
await async_check_output(*login_info_command)
except subprocess.CalledProcessError:
logger.info("Attempting to log in to OpCenter")
try:
secrets = current_context().secrets
login_command = [
"float",
"login",
"--address",
secrets.get("mmc_address"),
"--username",
secrets.get("mmc_username"),
"--password",
secrets.get("mmc_password"),
]
await async_check_output(*login_command)
except subprocess.CalledProcessError:
logger.exception("Failed to log in to OpCenter")
raise

logger.info("Logged in to OpCenter")

async def async_create(
self,
context: grpc.ServicerContext,
output_prefix: str,
task_template: TaskTemplate,
inputs: Optional[LiteralMap] = None,
) -> CreateTaskResponse:
"""
Submit Flyte task as MMCloud job to the OpCenter, and return the job UID for the task.
"""
submit_command = [
"float",
"submit",
"--force",
*self._response_format,
]

# We do not use container.resources because FlytePropeller will impose limits that should not apply to MMCloud
min_cpu, min_mem, max_cpu, max_mem = task_template.custom["resources"]
submit_command.extend(["--cpu", f"{min_cpu}:{max_cpu}"] if max_cpu else ["--cpu", f"{min_cpu}"])
submit_command.extend(["--mem", f"{min_mem}:{max_mem}"] if max_mem else ["--mem", f"{min_mem}"])

container = task_template.container

image = container.image
submit_command.extend(["--image", image])

env = container.env
for key, value in env.items():
submit_command.extend(["--env", f"{key}={value}"])

submit_extra = task_template.custom["submit_extra"]
submit_command.extend(shlex.split(submit_extra))

args = task_template.container.args
script_lines = ["#!/bin/bash\n", f"{shlex.join(args)}\n"]

task_id = task_template.id
try:
# float binary takes a job file as input, so one must be created
# Use a uniquely named temporary file to avoid race conditions and clutter
with NamedTemporaryFile(mode="w") as job_file:
job_file.writelines(script_lines)
# Flush immediately so that the job file is usable
job_file.flush()
logger.debug("Wrote job script")

submit_command.extend(["--job", job_file.name])

logger.info(f"Attempting to submit Flyte task {task_id} as MMCloud job")
logger.debug(f"With command: {submit_command}")
try:
await self.async_login()
submit_response = await async_check_output(*submit_command)
submit_response = json.loads(submit_response.decode())
job_id = submit_response["id"]
except subprocess.CalledProcessError as e:
logger.exception(
f"Failed to submit Flyte task {task_id} as MMCloud job\n"
f"[stdout] {e.stdout.decode()}\n"
f"[stderr] {e.stderr.decode()}\n"
)
raise
except (UnicodeError, json.JSONDecodeError):
logger.exception(f"Failed to decode submit response for Flyte task: {task_id}")
raise
except KeyError:
logger.exception(f"Failed to obtain MMCloud job id for Flyte task: {task_id}")
raise

logger.info(f"Submitted Flyte task {task_id} as MMCloud job {job_id}")
logger.debug(f"OpCenter response: {submit_response}")
except OSError:
logger.exception("Cannot open job script for writing")
raise

metadata = Metadata(job_id=job_id)

return CreateTaskResponse(resource_meta=json.dumps(asdict(metadata)).encode("utf-8"))

async def async_get(self, context: grpc.ServicerContext, resource_meta: bytes) -> GetTaskResponse:
"""
Return the status of the task, and return the outputs on success.
"""
metadata = Metadata(**json.loads(resource_meta.decode("utf-8")))
job_id = metadata.job_id

show_command = [
"float",
"show",
*self._response_format,
"--job",
job_id,
]

logger.info(f"Attempting to obtain status for MMCloud job {job_id}")
logger.debug(f"With command: {show_command}")
try:
await self.async_login()
show_response = await async_check_output(*show_command)
show_response = json.loads(show_response.decode())
job_status = show_response["status"]
except subprocess.CalledProcessError as e:
logger.exception(
f"Failed to get show response for MMCloud job: {job_id}\n"
f"[stdout] {e.stdout.decode()}\n"
f"[stderr] {e.stderr.decode()}\n"
)
raise
except (UnicodeError, json.JSONDecodeError):
logger.exception(f"Failed to decode show response for MMCloud job: {job_id}")
raise
except KeyError:
logger.exception(f"Failed to obtain status for MMCloud job: {job_id}")
raise

task_state = mmcloud_status_to_flyte_state(job_status)

logger.info(f"Obtained status for MMCloud job {job_id}: {job_status}")
logger.debug(f"OpCenter response: {show_response}")

return GetTaskResponse(resource=Resource(state=task_state))

async def async_delete(self, context: grpc.ServicerContext, resource_meta: bytes) -> DeleteTaskResponse:
"""
Delete the task. This call should be idempotent.
"""
metadata = Metadata(**json.loads(resource_meta.decode("utf-8")))
job_id = metadata.job_id

cancel_command = [
"float",
"cancel",
"--force",
"--job",
job_id,
]

logger.info(f"Attempting to cancel MMCloud job {job_id}")
logger.debug(f"With command: {cancel_command}")
try:
await self.async_login()
await async_check_output(*cancel_command)
except subprocess.CalledProcessError as e:
logger.exception(
f"Failed to cancel MMCloud job: {job_id}\n[stdout] {e.stdout.decode()}\n[stderr] {e.stderr.decode()}\n"
)
raise

logger.info(f"Submitted cancel request for MMCloud job: {job_id}")

return DeleteTaskResponse()


AgentRegistry.register(MMCloudAgent())
64 changes: 64 additions & 0 deletions plugins/flytekit-mmcloud/flytekitplugins/mmcloud/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any, Dict, Optional, Union

from flytekitplugins.mmcloud.utils import flyte_to_mmcloud_resources
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Struct

from flytekit.configuration import SerializationSettings
from flytekit.core.python_function_task import PythonFunctionTask
from flytekit.core.resources import Resources
from flytekit.extend import TaskPlugins
from flytekit.image_spec.image_spec import ImageSpec


@dataclass
class MMCloudConfig(object):
"""
Configures MMCloudTask. Tasks specified with MMCloudConfig will be executed using Memory Machine Cloud.
"""

# This allows the user to specify additional arguments for the float submit command
submit_extra: str = ""


class MMCloudTask(PythonFunctionTask):
_TASK_TYPE = "mmcloud_task"

def __init__(
self,
task_config: Optional[MMCloudConfig],
task_function: Callable,
container_image: Optional[Union[str, ImageSpec]] = None,
requests: Optional[Resources] = None,
limits: Optional[Resources] = None,
**kwargs,
):
super().__init__(
task_config=task_config or MMCloudConfig(),
task_type=self._TASK_TYPE,
task_function=task_function,
container_image=container_image,
**kwargs,
)

self._mmcloud_resources = flyte_to_mmcloud_resources(requests=requests, limits=limits)

def execute(self, **kwargs) -> Any:
return PythonFunctionTask.execute(self, **kwargs)

def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]:
"""
Return plugin-specific data as a serializable dictionary.
"""
config = {
"submit_extra": self.task_config.submit_extra,
"resources": [str(resource) if resource else None for resource in self._mmcloud_resources],
}
s = Struct()
s.update(config)
return json_format.MessageToDict(s)


TaskPlugins.register_pythontask_plugin(MMCloudConfig, MMCloudTask)
Loading

0 comments on commit 7f5e657

Please sign in to comment.