Skip to content

Commit

Permalink
feat: custom_scaler desired_replica_count
Browse files Browse the repository at this point in the history
  • Loading branch information
hagzag committed Dec 9, 2023
1 parent 57d5738 commit f2536ad
Show file tree
Hide file tree
Showing 13 changed files with 807 additions and 64 deletions.
25 changes: 15 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,33 +1,32 @@
# keda-python-poc



## starting local demo

1. git clone this repo
2. poetry install
3. poetry run uvicorn api:app --reload
2. task run-api-local
2. task run-grpc-local

[see also this section below](#run-localy)

## get scale - discover default values

```sh
curl -X GET localhost:8000/scale
curl -X GET localhost:9090/scale
```

yiedls: `{"desiredReplicas":3}`

## Configure "workers" scale

```sh
curl -X POST http://localhost:8000/scale -H "Content-Type: application/json" -d '{"desiredReplicas": 9}'
curl -X POST http://localhost:9090/scale -H "Content-Type: application/json" -d '{"desiredReplicas": 9}'
```

## Get new scale

```sh
curl -X GET localhost:8000/scale
curl -X GET localhost:9090/scale
```

yiedls: `{"desiredReplicas":9}`
Expand All @@ -37,21 +36,27 @@ yiedls: `{"desiredReplicas":9}`
run `task run-api-local` which will run the following commands for you:

- poetry install --no-root
- poetry env info
- poetry run uvicorn api:app --reload
- poetry run uvicorn src.api.main:app --port 9090 --reload

run `task run-grpc-local` which will run the following commands for you:

- poetry run python src/custom_scaler/grpc_server.py

run `task run-worker-local` which will run the following commands for you:

- poetry install --no-root
- poetry env info
- poetry run uvicorn worker:app --reload
- poetry run uvicorn src.worker.main:app --port 8080 --reload

## Build a container

run `task docker-build` which will run the following commands:

- `docker build -t 'your-repo/keda-poc:0.0.1'`

## Run via docker

TBC

## add keda scaling object

1. access to a given cluster - use a "Palygrond" cluster
Expand Down
18 changes: 12 additions & 6 deletions Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,30 @@ dotenv: [ '.env' ]

tasks:


run-local:
run-api-local:
desc: Build the Python - api / master application
dir: '{{.USER_WORKING_DIR}}'
cmds:
- poetry install --no-root
- poetry env info
- poetry run uvicorn api:app --reload
- poetry run uvicorn src.api.main:app --port 9090 --reload

run-api-local:
run-grpc-server:
desc: serve the protobug for the scaler
dir: '{{.USER_WORKING_DIR}}'
env:
LOG_LEVEL: "DEBUG"
SCALER_API_EP: http://localhost:9090/scale
GRPC_PORT: 50052
cmds:
- task: run-local
- poetry run python src/custom_scaler/grpc_server.py

run-worker-local:
desc: Build the Python - worker application
cmds:
- poetry install --no-root
- poetry env info
- poetry run uvicorn worker:app --reload
- poetry run uvicorn src.worker.main:app --port 8080 --reload

docker-build:
desc: Build a Docker container
Expand Down
357 changes: 356 additions & 1 deletion poetry.lock

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ uvicorn = "^0.24.0.post1"
fastapi = "^0.104.1"
pyyaml = "^6.0.1"
prometheus-fastapi-instrumentator = "^6.1.0"


grpcio = "^1.59.3"
grpcio-tools = "^1.59.3"
grpcio-reflection = "^1.59.3"
requests = "^2.31.0"
fastapi-healthchecks = "^1.0.7"

[build-system]
requires = ["poetry-core"]
Expand Down
28 changes: 23 additions & 5 deletions req.http
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,39 @@
###
# get replica count
###
GET http://localhost:8000/scale HTTP/1.1
GET http://localhost:9090/ HTTP/1.1
content-type: text/plain

###
# set replica count
# set (POST) replica count
###
POST http://localhost:8000/scale HTTP/1.1
POST http://localhost:9090/scale HTTP/1.1
content-type: application/json

{"desiredReplicas": 4}
{"desiredReplicas": 7}

###
# get (GET) replica count via `/scale`
###
GET http://localhost:9090/scale HTTP/1.1
content-type: text/plain

###
# grpc cant be tested in this file ::shrug::
###
# GET http://localhost:50051/IsActive HTTP/1.1
# content-type: grpc
#
# use grpcurl
# grpcurl -plaintext localhost:50051 externalscaler.ExternalScaler.IsActive
# {
# "result": true
# }

###
# healthcheck
###
GET http://localhost:8000/health HTTP/1.1
GET http://localhost:9090/health HTTP/1.1
content-type: application/json

###
Expand Down
7 changes: 4 additions & 3 deletions api.py → src/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,23 @@ class ScaleResponse(BaseModel):
class ScaleRequest(BaseModel):
desiredReplicas: conint(ge=0, le=9) # Constrained int between 0 and 9

@app.get("/")
def get_scale():
return Response(content=str("OK"), media_type="text/plain")

@app.get("/health")
async def health_check():
# Simple health check endpoint
return {"status": "OK"}

@app.get("/scale", response_model=ScaleResponse)
def get_scale():
# return ScaleResponse(desiredReplicas=current_scale)
return Response(content=str(current_scale), media_type="text/plain")


@app.post("/scale", response_model=ScaleResponse)
def set_scale(scale_request: ScaleRequest):
global current_scale
current_scale = scale_request.desiredReplicas
# return ScaleResponse(desiredReplicas=current_scale)
return Response(content=str(current_scale), media_type="text/plain")


Expand Down
70 changes: 70 additions & 0 deletions src/custom_scaler/custom_scaler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import os
import time
import requests
import logging
from urllib.parse import urlparse
import externalscaler_pb2_grpc
import externalscaler_pb2

# Set logging level based on LOG_LEVEL environment variable
log_level = logging.DEBUG if os.getenv('LOG_LEVEL') == 'DEBUG' else logging.INFO
logging.basicConfig(level=log_level)

class CustomScaler(externalscaler_pb2_grpc.ExternalScalerServicer):
def fetch_metric_value(self):
"""Fetches the metric value from the scaler API endpoint."""
scaler_api_endpoint = os.getenv('SCALER_API_EP', 'http://localhost:9090/scale')
logging.debug(f"Fetching metric value from: {scaler_api_endpoint}")

# Validate the URL
if not urlparse(scaler_api_endpoint).scheme:
logging.error(f"Invalid URL: {scaler_api_endpoint}")
return 0

try:
response = requests.get(scaler_api_endpoint)
response.raise_for_status()
logging.debug(f"Response received: {response.text.strip()}")
return response.text.strip()
except requests.RequestException as e:
logging.error(f"Error fetching metric value from {scaler_api_endpoint}: {e}")
return 0

def IsActive(self, request, context):
metric_value = self.fetch_metric_value()
is_active = metric_value != "0"
logging.debug(f"IsActive check: {is_active}")
return externalscaler_pb2.IsActiveResponse(result=is_active)

def StreamIsActive(self, request, context):
while True:
metric_value = self.fetch_metric_value()
is_active = metric_value != "0"
logging.debug(f"StreamIsActive check: {is_active}")
yield externalscaler_pb2.IsActiveResponse(result=is_active)
time.sleep(1)

def GetMetricSpec(self, request, context):
target_size = self.fetch_metric_value()
try:
target_size = int(target_size)
except ValueError:
logging.error(f"Invalid target size value: {target_size}")
target_size = 0

metric_spec = externalscaler_pb2.MetricSpec(metricName="desired_replica_count", targetSize=target_size)
logging.debug(f"GetMetricSpec response: {metric_spec}")
return externalscaler_pb2.GetMetricSpecResponse(metricSpecs=[metric_spec])

def GetMetrics(self, request, context):
metric_value = self.fetch_metric_value()
try:
metric_value = int(metric_value)
except ValueError:
logging.error(f"Invalid metric value: {metric_value}")
metric_value = 0

logging.debug(f"GetMetrics response: MetricValue(metricName='desired_replica_count', metricValue={metric_value})")
return externalscaler_pb2.GetMetricsResponse(metricValues=[externalscaler_pb2.MetricValue(metricName="desired_replica_count", metricValue=metric_value)])

# Additional server setup code goes here
44 changes: 44 additions & 0 deletions src/custom_scaler/externalscaler.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
syntax = "proto3";

package externalscaler;
option go_package = ".;externalscaler";

service ExternalScaler {
rpc IsActive(ScaledObjectRef) returns (IsActiveResponse) {}
rpc StreamIsActive(ScaledObjectRef) returns (stream IsActiveResponse) {}
rpc GetMetricSpec(ScaledObjectRef) returns (GetMetricSpecResponse) {}
rpc GetMetrics(GetMetricsRequest) returns (GetMetricsResponse) {}
}

message ScaledObjectRef {
string name = 1;
string namespace = 2;
map<string, string> scalerMetadata = 3;
}

message IsActiveResponse {
bool result = 1;
}

message GetMetricSpecResponse {
repeated MetricSpec metricSpecs = 1;
}

message MetricSpec {
string metricName = 1;
int64 targetSize = 2;
}

message GetMetricsRequest {
ScaledObjectRef scaledObjectRef = 1;
string metricName = 2;
}

message GetMetricsResponse {
repeated MetricValue metricValues = 1;
}

message MetricValue {
string metricName = 1;
int64 metricValue = 2;
}
44 changes: 44 additions & 0 deletions src/custom_scaler/externalscaler_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit f2536ad

Please sign in to comment.