Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SkyServe] deprecate old serve autoscaler policy #2868

Merged
merged 10 commits into from
Dec 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 50 additions & 22 deletions sky/serve/autoscalers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

logger = sky_logging.init_logger(__name__)

_UPSCALE_DELAY_S = 300
MaoZiming marked this conversation as resolved.
Show resolved Hide resolved
_DOWNSCALE_DELAY_S = 1200


class AutoscalerDecisionOperator(enum.Enum):
SCALE_UP = 'scale_up'
Expand Down Expand Up @@ -86,25 +89,31 @@ class RequestRateAutoscaler(Autoscaler):
"""

def __init__(self, spec: 'service_spec.SkyServiceSpec', frequency: int,
cooldown: int, rps_window_size: int) -> None:
rps_window_size: int) -> None:
"""Initialize the request rate autoscaler.

Variables:
upper_threshold: Upper threshold for scale up. If None, no scale up.
lower_threshold: Lower threshold for scale down. If None, no scale
down.
cooldown: Cooldown between two scaling operations in seconds.
rps_window_size: Window size for rps calculating.
last_scale_operation: Time of last scale operation.
request_timestamps: All request timestamps within the window.
upscale_counter: counter for upscale number of replicas.
downscale_counter: counter for downscale number of replicas.
scale_up_consecutive_periods: period for scaling up.
scale_down_consecutive_periods: period for scaling down.
"""
super().__init__(spec, frequency)
self.upper_threshold: Optional[float] = spec.qps_upper_threshold
self.lower_threshold: Optional[float] = spec.qps_lower_threshold
self.cooldown: int = cooldown
self.rps_window_size: int = rps_window_size
self.last_scale_operation: float = 0.
self.request_timestamps: List[float] = []
self.upscale_counter: int = 0
self.downscale_counter: int = 0
self.scale_up_consecutive_periods: int = int(_UPSCALE_DELAY_S /
self.frequency)
self.scale_down_consecutive_periods: int = int(_DOWNSCALE_DELAY_S /
self.frequency)

def collect_request_information(
self, request_aggregator_info: Dict[str, Any]) -> None:
Expand All @@ -127,21 +136,8 @@ def evaluate_scaling(
self,
replica_infos: List['replica_managers.ReplicaInfo'],
) -> AutoscalerDecision:
current_time = time.time()
num_replicas = len(replica_infos)

# Check if cooldown period has passed since the last scaling operation.
# Only cooldown if bootstrapping is done.
if num_replicas >= self.min_replicas:
if current_time - self.last_scale_operation < self.cooldown:
logger.info(
f'Current time: {current_time}, '
f'last scale operation: {self.last_scale_operation}, '
f'cooldown: {self.cooldown}')
logger.info('Cooldown period has not passed since last scaling '
'operation. Skipping scaling.')
return AutoscalerDecision(AutoscalerDecisionOperator.NO_OP,
target=None)
num_replicas = len(replica_infos)

# Convert to requests per second.
num_requests_per_second = len(
Expand All @@ -150,9 +146,11 @@ def evaluate_scaling(
requests_per_replica = (num_requests_per_second / num_replicas
if num_replicas else num_requests_per_second)

logger.info(f'Requests per replica: {requests_per_replica}')

logger.info(f'Requests per replica: {requests_per_replica}, '
f'upper_threshold: {self.upper_threshold}, '
f'lower_threshold: {self.lower_threshold}')
logger.info(f'Number of replicas: {num_replicas}')

target_num_replicas = num_replicas
if num_replicas < self.min_replicas:
target_num_replicas = self.min_replicas
Expand All @@ -167,7 +165,37 @@ def evaluate_scaling(

target_num_replicas = max(self.min_replicas,
min(self.max_replicas, target_num_replicas))
num_replicas_delta = target_num_replicas - num_replicas

logger.info(f'Target number of replicas: {target_num_replicas}, '
f'min_replicas: {self.min_replicas}, '
f'max_replicas: {self.max_replicas}')

num_replicas_delta = 0
if num_replicas < self.min_replicas:
num_replicas_delta = self.min_replicas - num_replicas
self.upscale_counter = 0
elif num_replicas > self.max_replicas:
num_replicas_delta = self.max_replicas - num_replicas
self.downscale_counter = 0
elif target_num_replicas > num_replicas:
self.upscale_counter += 1
self.downscale_counter = 0
if self.upscale_counter >= self.scale_up_consecutive_periods:
MaoZiming marked this conversation as resolved.
Show resolved Hide resolved
self.upscale_counter = 0
num_replicas_delta = target_num_replicas - num_replicas
elif target_num_replicas < num_replicas:
self.downscale_counter += 1
self.upscale_counter = 0
if self.downscale_counter >= self.scale_down_consecutive_periods:
self.downscale_counter = 0
num_replicas_delta = target_num_replicas - num_replicas
else:
self.upscale_counter = self.downscale_counter = 0
logger.info(f'Upscale counter: {self.upscale_counter}/'
f'{self.scale_up_consecutive_periods}. '
f'Downscale counter: {self.downscale_counter}/'
f'{self.scale_down_consecutive_periods}')

if num_replicas_delta == 0:
logger.info('No scaling needed.')
return AutoscalerDecision(AutoscalerDecisionOperator.NO_OP,
Expand Down
4 changes: 0 additions & 4 deletions sky/serve/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@
# Autoscaler scale frequency in seconds. We will try to scale up/down every
# `scale_frequency`.
AUTOSCALER_SCALE_FREQUENCY_SECONDS = 20
# Autoscaler cooldown time in seconds. We will not scale up/down if the last
# scale up/down is within this cooldown time.
AUTOSCALER_COOLDOWN_SECONDS = 60

# The default controller resources. We need 200 GB disk space to enable using
# Azure as controller, since its default image size is 150 GB.
# TODO(tian): We might need to be careful that service logs can take a lot of
Expand Down
1 change: 0 additions & 1 deletion sky/serve/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ def __init__(self, service_name: str, service_spec: serve.SkyServiceSpec,
autoscalers.RequestRateAutoscaler(
service_spec,
frequency=constants.AUTOSCALER_SCALE_FREQUENCY_SECONDS,
cooldown=constants.AUTOSCALER_COOLDOWN_SECONDS,
rps_window_size=constants.AUTOSCALER_RPS_WINDOW_SIZE_SECONDS))
self._port = port
self._app = fastapi.FastAPI()
Expand Down
41 changes: 41 additions & 0 deletions tests/test_serve_autoscaler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import argparse
import time

import requests


def send_get_request(ip, port):
url = f"http://{ip}:{port}"
try:
response = requests.get(url)
print(
f"GET request sent to {url} | Status code: {response.status_code}")
except requests.RequestException as e:
print(f"Error sending GET request to {url}: {e}")


def main():
parser = argparse.ArgumentParser(
description="Send GET requests to a specified IP address and port.")
parser.add_argument("--ip-address",
help="The IP address to send requests to.")
parser.add_argument("--port", type=int, help="The port number to use.")
parser.add_argument(
"--frequency",
type=int,
default=5,
help="Sending frequency in seconds (default is 5 seconds).")

args = parser.parse_args()

ip_address = args.ip_address
port = args.port
sending_frequency_seconds = args.frequency

while True:
send_get_request(ip_address, port)
time.sleep(sending_frequency_seconds)


if __name__ == "__main__":
main()
26 changes: 26 additions & 0 deletions tests/test_yamls/test_serve_autoscaler.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# SkyServe YAML to run a simple http server.
#
# Usage:
# sky serve up -n http examples/serve/http_server/task.yaml
# The endpoint will be printed in the console. You
# could also check the endpoint by running:
# sky serve status --endpoint http

service:
readiness_probe:
path: /health
initial_delay_seconds: 20

replica_policy:
min_replicas: 1
max_replicas: 3
qps_upper_threshold: 1
qps_lower_threshold: 0.5

resources:
ports: 8081
cpus: 2+

workdir: examples/serve/http_server

run: python3 server.py