Skip to content

Commit

Permalink
[OCI] Enable SkyServe for OCI (#4338)
Browse files Browse the repository at this point in the history
* [OCI] Enable SkyServe for OCI

* enable open_ports

* fix

* Add example serve-qwen-7b.yaml

* fix

* format

* Skip check the source CIDR so that user can control the security by manually.

* Update sky/provision/oci/query_utils.py

Co-authored-by: Tian Xia <cblmemo@gmail.com>

* Update sky/provision/oci/query_utils.py

Co-authored-by: Tian Xia <cblmemo@gmail.com>

* Update sky/provision/oci/query_utils.py

Co-authored-by: Tian Xia <cblmemo@gmail.com>

* nit

* Implement open_ports/cleanup_ports per cluster

* Address review comments

* naming

* debug info

* remove unneccessary logic

* detach the nsg before instance termination

* typo

* Add example

* same file already exists in examples/serve folder

* Add example for serve cpu resource task.

* nit

* Address review comments: mainly eliminate the port
overlap issue.

* Add a smoke test

* nit

* OCI now supports open_port

---------

Co-authored-by: Tian Xia <cblmemo@gmail.com>
  • Loading branch information
HysunHe and cblmemo authored Nov 15, 2024
1 parent a2278cb commit e41ce2a
Show file tree
Hide file tree
Showing 9 changed files with 291 additions and 26 deletions.
11 changes: 11 additions & 0 deletions examples/oci/serve-http-cpu.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
service:
readiness_probe: /
replicas: 2

resources:
cloud: oci
region: us-sanjose-1
ports: 8080
cpus: 2+

run: python -m http.server 8080
25 changes: 25 additions & 0 deletions examples/oci/serve-qwen-7b.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# service.yaml
service:
readiness_probe: /v1/models
replicas: 2

# Fields below describe each replica.
resources:
cloud: oci
region: us-sanjose-1
ports: 8080
accelerators: {A10:1}

setup: |
conda create -n vllm python=3.12 -y
conda activate vllm
pip install vllm
pip install vllm-flash-attn
run: |
conda activate vllm
python -u -m vllm.entrypoints.openai.api_server \
--host 0.0.0.0 --port 8080 \
--model Qwen/Qwen2-7B-Instruct \
--served-model-name Qwen2-7B-Instruct \
--device=cuda --dtype auto --max-model-len=2048
2 changes: 0 additions & 2 deletions sky/clouds/oci.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ def _unsupported_features_for_resources(
(f'Docker image is currently not supported on {cls._REPR}. '
'You can try running docker command inside the '
'`run` section in task.yaml.'),
clouds.CloudImplementationFeatures.OPEN_PORTS:
(f'Opening ports is currently not supported on {cls._REPR}.'),
}
if resources.use_spot:
features[clouds.CloudImplementationFeatures.STOP] = (
Expand Down
5 changes: 5 additions & 0 deletions sky/clouds/utils/oci_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
- Zhanghao Wu @ Oct 2023: Formatting and refactoring
- Hysun He (hysun.he@oracle.com) @ Oct, 2024: Add default image OS
configuration.
- Hysun He (hysun.he@oracle.com) @ Nov.12, 2024: Add the constant
SERVICE_PORT_RULE_TAG
"""
import os

Expand Down Expand Up @@ -42,6 +44,9 @@ class OCIConfig:
VCN_CIDR_INTERNET = '0.0.0.0/0'
VCN_CIDR = '192.168.0.0/16'
VCN_SUBNET_CIDR = '192.168.0.0/18'
SERVICE_PORT_RULE_TAG = 'SkyServe-Service-Port'
# NSG name template
NSG_NAME_TEMPLATE = 'nsg_{cluster_name}'

MAX_RETRY_COUNT = 3
RETRY_INTERVAL_BASE_SECONDS = 5
Expand Down
23 changes: 12 additions & 11 deletions sky/provision/oci/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
History:
- Hysun He (hysun.he@oracle.com) @ Oct.16, 2024: Initial implementation
- Hysun He (hysun.he@oracle.com) @ Nov.13, 2024: Implement open_ports
and cleanup_ports for supporting SkyServe.
"""

import copy
Expand Down Expand Up @@ -292,11 +294,11 @@ def open_ports(
provider_config: Optional[Dict[str, Any]] = None,
) -> None:
"""Open ports for inbound traffic."""
# OCI ports in security groups are opened while creating the new
# VCN (skypilot_vcn). If user configure to use existing VCN, it is
# intended to let user to manage the ports instead of automatically
# opening ports here.
del cluster_name_on_cloud, ports, provider_config
assert provider_config is not None, cluster_name_on_cloud
region = provider_config['region']
query_helper.create_nsg_rules(region=region,
cluster_name=cluster_name_on_cloud,
ports=ports)


@query_utils.debug_enabled(logger)
Expand All @@ -306,12 +308,11 @@ def cleanup_ports(
provider_config: Optional[Dict[str, Any]] = None,
) -> None:
"""Delete any opened ports."""
del cluster_name_on_cloud, ports, provider_config
# OCI ports in security groups are opened while creating the new
# VCN (skypilot_vcn). The VCN will only be created at the first
# time when it is not existed. We'll not automatically delete the
# VCN while teardown clusters. it is intended to let user to decide
# to delete the VCN or not from OCI console, for example.
assert provider_config is not None, cluster_name_on_cloud
region = provider_config['region']
del ports
query_helper.remove_cluster_nsg(region=region,
cluster_name=cluster_name_on_cloud)


@query_utils.debug_enabled(logger)
Expand Down
218 changes: 212 additions & 6 deletions sky/provision/oci/query_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
migrated from the old provisioning API.
- Hysun He (hysun.he@oracle.com) @ Oct.18, 2024: Enhancement.
find_compartment: allow search subtree when find a compartment.
- Hysun He (hysun.he@oracle.com) @ Nov.12, 2024: Add methods to
Add/remove security rules: create_nsg_rules & remove_nsg
"""
from datetime import datetime
import functools
Expand All @@ -13,12 +15,15 @@
import time
import traceback
import typing
from typing import Optional
from typing import List, Optional, Tuple

from sky import exceptions
from sky import sky_logging
from sky.adaptors import common as adaptors_common
from sky.adaptors import oci as oci_adaptor
from sky.clouds.utils import oci_utils
from sky.provision import constants
from sky.utils import resources_utils

if typing.TYPE_CHECKING:
import pandas as pd
Expand Down Expand Up @@ -81,19 +86,33 @@ def query_instances_by_tags(cls, tag_filters, region):
return result_set

@classmethod
@debug_enabled(logger)
def terminate_instances_by_tags(cls, tag_filters, region) -> int:
logger.debug(f'Terminate instance by tags: {tag_filters}')

cluster_name = tag_filters[constants.TAG_RAY_CLUSTER_NAME]
nsg_name = oci_utils.oci_config.NSG_NAME_TEMPLATE.format(
cluster_name=cluster_name)
nsg_id = cls.find_nsg(region, nsg_name, create_if_not_exist=False)

core_client = oci_adaptor.get_core_client(
region, oci_utils.oci_config.get_profile())

insts = cls.query_instances_by_tags(tag_filters, region)
fail_count = 0
for inst in insts:
inst_id = inst.identifier
logger.debug(f'Got instance(to be terminated): {inst_id}')
logger.debug(f'Terminating instance {inst_id}')

try:
oci_adaptor.get_core_client(
region,
oci_utils.oci_config.get_profile()).terminate_instance(
inst_id)
# Release the NSG reference so that the NSG can be
# deleted without waiting the instance being terminated.
if nsg_id is not None:
cls.detach_nsg(region, inst, nsg_id)

# Terminate the instance
core_client.terminate_instance(inst_id)

except oci_adaptor.oci.exceptions.ServiceError as e:
fail_count += 1
logger.error(f'Terminate instance failed: {str(e)}\n: {inst}')
Expand Down Expand Up @@ -468,5 +487,192 @@ def delete_vcn(cls, net_client, skypilot_vcn, skypilot_subnet,
logger.error(
f'Delete VCN {oci_utils.oci_config.VCN_NAME} Error: {str(e)}')

@classmethod
@debug_enabled(logger)
def find_nsg(cls, region: str, nsg_name: str,
create_if_not_exist: bool) -> Optional[str]:
net_client = oci_adaptor.get_net_client(
region, oci_utils.oci_config.get_profile())

compartment = cls.find_compartment(region)

list_vcns_resp = net_client.list_vcns(
compartment_id=compartment,
display_name=oci_utils.oci_config.VCN_NAME,
lifecycle_state='AVAILABLE',
)

if not list_vcns_resp:
raise exceptions.ResourcesUnavailableError(
'The VCN is not available')

# Get the primary vnic.
assert len(list_vcns_resp.data) > 0
vcn = list_vcns_resp.data[0]

list_nsg_resp = net_client.list_network_security_groups(
compartment_id=compartment,
vcn_id=vcn.id,
limit=1,
display_name=nsg_name,
)

nsgs = list_nsg_resp.data
if nsgs:
assert len(nsgs) == 1
return nsgs[0].id
elif not create_if_not_exist:
return None

# Continue to create new NSG if not exists
create_nsg_resp = net_client.create_network_security_group(
create_network_security_group_details=oci_adaptor.oci.core.models.
CreateNetworkSecurityGroupDetails(
compartment_id=compartment,
vcn_id=vcn.id,
display_name=nsg_name,
))
get_nsg_resp = net_client.get_network_security_group(
network_security_group_id=create_nsg_resp.data.id)
oci_adaptor.oci.wait_until(
net_client,
get_nsg_resp,
'lifecycle_state',
'AVAILABLE',
)

return get_nsg_resp.data.id

@classmethod
def get_range_min_max(cls, port_range: str) -> Tuple[int, int]:
range_list = port_range.split('-')
if len(range_list) == 1:
return (int(range_list[0]), int(range_list[0]))
from_port, to_port = range_list
return (int(from_port), int(to_port))

@classmethod
@debug_enabled(logger)
def create_nsg_rules(cls, region: str, cluster_name: str,
ports: List[str]) -> None:
""" Create per-cluster NSG with ingress rules """
if not ports:
return

net_client = oci_adaptor.get_net_client(
region, oci_utils.oci_config.get_profile())

nsg_name = oci_utils.oci_config.NSG_NAME_TEMPLATE.format(
cluster_name=cluster_name)
nsg_id = cls.find_nsg(region, nsg_name, create_if_not_exist=True)

filters = {constants.TAG_RAY_CLUSTER_NAME: cluster_name}
insts = query_helper.query_instances_by_tags(filters, region)
for inst in insts:
vnic = cls.get_instance_primary_vnic(
region=region,
inst_info={
'inst_id': inst.identifier,
'ad': inst.availability_domain,
'compartment': inst.compartment_id,
})
nsg_ids = vnic.nsg_ids
if not nsg_ids:
net_client.update_vnic(
vnic_id=vnic.id,
update_vnic_details=oci_adaptor.oci.core.models.
UpdateVnicDetails(nsg_ids=[nsg_id],
skip_source_dest_check=False),
)

# pylint: disable=line-too-long
list_nsg_rules_resp = net_client.list_network_security_group_security_rules(
network_security_group_id=nsg_id,
direction='INGRESS',
sort_by='TIMECREATED',
sort_order='DESC',
)

ingress_rules: List = list_nsg_rules_resp.data
existing_port_ranges: List[str] = []
for r in ingress_rules:
if r.tcp_options:
options_range = r.tcp_options.destination_port_range
rule_port_range = f'{options_range.min}-{options_range.max}'
existing_port_ranges.append(rule_port_range)

new_ports = resources_utils.port_ranges_to_set(ports)
existing_ports = resources_utils.port_ranges_to_set(
existing_port_ranges)
if new_ports.issubset(existing_ports):
# ports already contains in the existing rules, nothing to add.
return

# Determine the ports to be added, without overlapping.
ports_to_open = new_ports - existing_ports
port_ranges_to_open = resources_utils.port_set_to_ranges(ports_to_open)

new_rules = []
for port_range in port_ranges_to_open:
port_range_min, port_range_max = cls.get_range_min_max(port_range)
new_rules.append(
oci_adaptor.oci.core.models.AddSecurityRuleDetails(
direction='INGRESS',
protocol='6',
is_stateless=False,
source=oci_utils.oci_config.VCN_CIDR_INTERNET,
source_type='CIDR_BLOCK',
tcp_options=oci_adaptor.oci.core.models.TcpOptions(
destination_port_range=oci_adaptor.oci.core.models.
PortRange(min=port_range_min, max=port_range_max),),
description=oci_utils.oci_config.SERVICE_PORT_RULE_TAG,
))

net_client.add_network_security_group_security_rules(
network_security_group_id=nsg_id,
add_network_security_group_security_rules_details=oci_adaptor.oci.
core.models.AddNetworkSecurityGroupSecurityRulesDetails(
security_rules=new_rules),
)

@classmethod
@debug_enabled(logger)
def detach_nsg(cls, region: str, inst, nsg_id: Optional[str]) -> None:
if nsg_id is None:
return

vnic = cls.get_instance_primary_vnic(
region=region,
inst_info={
'inst_id': inst.identifier,
'ad': inst.availability_domain,
'compartment': inst.compartment_id,
})

# Detatch the NSG before removing it.
oci_adaptor.get_net_client(region, oci_utils.oci_config.get_profile(
)).update_vnic(
vnic_id=vnic.id,
update_vnic_details=oci_adaptor.oci.core.models.UpdateVnicDetails(
nsg_ids=[], skip_source_dest_check=False),
)

@classmethod
@debug_enabled(logger)
def remove_cluster_nsg(cls, region: str, cluster_name: str) -> None:
""" Remove NSG of the cluster """
net_client = oci_adaptor.get_net_client(
region, oci_utils.oci_config.get_profile())

nsg_name = oci_utils.oci_config.NSG_NAME_TEMPLATE.format(
cluster_name=cluster_name)
nsg_id = cls.find_nsg(region, nsg_name, create_if_not_exist=False)
if nsg_id is None:
return

# Delete the NSG
net_client.delete_network_security_group(
network_security_group_id=nsg_id)


query_helper = QueryHelper()
14 changes: 7 additions & 7 deletions sky/utils/controller_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,13 @@ def _get_cloud_dependencies_installation_commands(
'pip list | grep runpod > /dev/null 2>&1 || '
'pip install "runpod>=1.5.1" > /dev/null 2>&1')
setup_clouds.append(str(cloud))
elif isinstance(cloud, clouds.OCI):
step_prefix = prefix_str.replace('<step>',
str(len(setup_clouds) + 1))
commands.append(f'echo -en "\\r{prefix_str}OCI{empty_str}" && '
'pip list | grep oci > /dev/null 2>&1 || '
'pip install oci > /dev/null 2>&1')
setup_clouds.append(str(cloud))
if controller == Controllers.JOBS_CONTROLLER:
if isinstance(cloud, clouds.IBM):
step_prefix = prefix_str.replace('<step>',
Expand All @@ -303,13 +310,6 @@ def _get_cloud_dependencies_installation_commands(
'pip install ibm-cloud-sdk-core ibm-vpc '
'ibm-platform-services ibm-cos-sdk > /dev/null 2>&1')
setup_clouds.append(str(cloud))
elif isinstance(cloud, clouds.OCI):
step_prefix = prefix_str.replace('<step>',
str(len(setup_clouds) + 1))
commands.append(f'echo -en "\\r{prefix_str}OCI{empty_str}" && '
'pip list | grep oci > /dev/null 2>&1 || '
'pip install oci > /dev/null 2>&1')
setup_clouds.append(str(cloud))
if (cloudflare.NAME
in storage_lib.get_cached_enabled_storage_clouds_or_refresh()):
step_prefix = prefix_str.replace('<step>', str(len(setup_clouds) + 1))
Expand Down
Loading

0 comments on commit e41ce2a

Please sign in to comment.