Skip to content

Commit

Permalink
feat: change workflow after clicking 'Apply changes' button (#82)
Browse files Browse the repository at this point in the history
  • Loading branch information
wojtekzyla authored May 23, 2024
1 parent b311fd4 commit 99f85d0
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 69 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Changed
- add error handling for apply changes action
- after clicking 'Apply changes' workflow is initially attempting to create new job immediately, if it is impossible, schedule it for the future

## [1.0.2]

Expand Down
6 changes: 4 additions & 2 deletions backend/SC4SNMP_UI_backend/apply_changes/apply_changes.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ def __init__(self) -> None:
mongo_config_collection.update_one(
{
"previous_job_start_time": {"$exists": True},
"currently_scheduled": {"$exists": True}}
"currently_scheduled": {"$exists": True},
"task_id": {"$exists": True}}
,{
"$set":{
"previous_job_start_time": None,
"currently_scheduled": False
"currently_scheduled": False,
"task_id": None
}
},
upsert=True
Expand Down
69 changes: 48 additions & 21 deletions backend/SC4SNMP_UI_backend/apply_changes/handling_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import ruamel.yaml
from flask import current_app
from SC4SNMP_UI_backend import mongo_client
from SC4SNMP_UI_backend.apply_changes.tasks import run_job
from SC4SNMP_UI_backend.apply_changes.tasks import run_job, get_job_config
from SC4SNMP_UI_backend.apply_changes.kubernetes_job import create_job
from kubernetes.client import ApiException
import datetime
import os

Expand All @@ -13,6 +15,7 @@
VALUES_DIRECTORY = os.getenv("VALUES_DIRECTORY", "")
VALUES_FILE = os.getenv("VALUES_FILE", "")
KEEP_TEMP_FILES = os.getenv("KEEP_TEMP_FILES", "false")
JOB_NAMESPACE = os.getenv("JOB_NAMESPACE", "sc4snmp")
mongo_config_collection = mongo_client.sc4snmp.config_collection
mongo_groups = mongo_client.sc4snmp.groups_ui
mongo_inventory = mongo_client.sc4snmp.inventory_ui
Expand Down Expand Up @@ -138,31 +141,55 @@ def handle(self, request: dict = None):
:return: pass dictionary with job_delay in seconds to the next handler
"""
record = list(mongo_config_collection.find())[0]
last_update = record["previous_job_start_time"]
if last_update is None:
# If it's the first time that the job is run (record in mongo_config_collection has been created
# in ApplyChanges class and last_update attribute is None) then job delay should be equal to
# CHANGES_INTERVAL_SECONDS. Update the mongo record with job state accordingly.
job_delay = CHANGES_INTERVAL_SECONDS
schedule_new_job = True
# get_job_config return job configuration in "job" variable and BatchV1Api from kubernetes client
job, batch_v1 = get_job_config()
if job is None or batch_v1 is None:
raise ValueError("CheckJobHandler: Job configuration is empty")
try:
# Try creating a new kubernetes job immediately. If the previous job is still present in the namespace,
# ApiException will be thrown.
create_job(batch_v1, job, JOB_NAMESPACE)
task_id = record["task_id"]
if task_id is not None:
# revoke existing Celery task with the previously scheduled job
current_app.extensions["celery"].control.revoke(task_id,
terminate=True, signal='SIGKILL')
mongo_config_collection.update_one({"_id": record["_id"]},
{"$set": {"previous_job_start_time": datetime.datetime.utcnow()}})
# time from the last update
{"$set": {"previous_job_start_time": datetime.datetime.utcnow(),
"currently_scheduled": False,
"task_id": None}})
job_delay = 1
time_difference = 0
else:
schedule_new_job = False
except ApiException:
# Check how many seconds have elapsed since the last time that the job was run. If the time difference
# is greater than CHANGES_INTERVAL_SECONDS then job can be run immediately. Otherwise, calculate how
# is greater than CHANGES_INTERVAL_SECONDS then job can be scheduled within 1 second. Otherwise, calculate how
# many seconds are left until minimum time difference between updates (CHANGES_INTERVAL_SECONDS).
current_time = datetime.datetime.utcnow()
delta = current_time - last_update
time_difference = delta.total_seconds()
if time_difference > CHANGES_INTERVAL_SECONDS:
job_delay = 1
last_update = record["previous_job_start_time"]
if last_update is None:
# If it's the first time that the job is run (record in mongo_config_collection has been created
# in ApplyChanges class and last_update attribute is None) but the previous job is still in the namespace
# then job delay should be equal to CHANGES_INTERVAL_SECONDS.
# Update the mongo record with job state accordingly.
job_delay = CHANGES_INTERVAL_SECONDS
mongo_config_collection.update_one({"_id": record["_id"]},
{"$set": {"previous_job_start_time": datetime.datetime.utcnow()}})
# time from the last update
time_difference = 0
else:
job_delay = int(CHANGES_INTERVAL_SECONDS - time_difference)
current_time = datetime.datetime.utcnow()
delta = current_time - last_update
time_difference = delta.total_seconds()
if time_difference > CHANGES_INTERVAL_SECONDS:
job_delay = 1
else:
job_delay = int(CHANGES_INTERVAL_SECONDS - time_difference)

result = {
"job_delay": job_delay,
"time_from_last_update": time_difference
"time_from_last_update": time_difference,
"schedule_new_job": schedule_new_job
}

current_app.logger.info(f"CheckJobHandler: {result}")
Expand All @@ -175,11 +202,11 @@ def handle(self, request: dict):
ScheduleHandler schedules the kubernetes job with updated sc4snmp configuration
"""
record = list(mongo_config_collection.find())[0]
if not record["currently_scheduled"]:
if not record["currently_scheduled"] and request["schedule_new_job"]:
# If the task isn't currently scheduled, schedule it and update its state in mongo.
async_result = run_job.apply_async(countdown=request["job_delay"], queue='apply_changes')
mongo_config_collection.update_one({"_id": record["_id"]},
{"$set": {"currently_scheduled": True}})
run_job.apply_async(countdown=request["job_delay"], queue='apply_changes')
{"$set": {"currently_scheduled": True, "task_id": async_result.id}})
current_app.logger.info(
f"ScheduleHandler: scheduling new task with the delay of {request['job_delay']} seconds.")
else:
Expand Down
23 changes: 17 additions & 6 deletions backend/SC4SNMP_UI_backend/apply_changes/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
JOB_CONFIG_PATH = os.getenv("JOB_CONFIG_PATH", "/config/job_config.yaml")
celery_logger = get_task_logger(__name__)

@shared_task()
def run_job():
def get_job_config():
"""
:return: job - configuration of the job
batch_v1 - BatchV1Api object from kubernetes client
"""
job = None
batch_v1 = None
with open(JOB_CONFIG_PATH, encoding="utf-8") as file:
Expand All @@ -26,6 +29,13 @@ def run_job():
config.load_incluster_config()
batch_v1 = client.BatchV1Api()
job = create_job_object(config_file)
return job, batch_v1

@shared_task()
def run_job():
job, batch_v1 = get_job_config()
if job is None or batch_v1 is None:
raise ValueError("Scheduled kubernetes job: Job configuration is empty")

with MongoClient(MONGO_URI) as connection:
try_creating = True
Expand All @@ -39,8 +49,9 @@ def run_job():
try:
record = list(connection.sc4snmp.config_collection.find())[0]
connection.sc4snmp.config_collection.update_one({"_id": record["_id"]},
{"$set": {"previous_job_start_time": datetime.datetime.utcnow(),
"currently_scheduled": False}})
{"$set": {"previous_job_start_time": datetime.datetime.utcnow(),
"currently_scheduled": False,
"task_id": None}})
except Exception as e:
celery_logger.info(f"Error occurred while updating job state after job creation: {str(e)}")
except ApiException:
Expand All @@ -50,6 +61,6 @@ def run_job():
celery_logger.info(f"Kubernetes job was not created. Max retries ({JOB_CREATION_RETRIES}) exceeded.")
record = list(connection.sc4snmp.config_collection.find())[0]
connection.sc4snmp.config_collection.update_one({"_id": record["_id"]},
{"$set": {"currently_scheduled": False}})
{"$set": {"currently_scheduled": False, "task_id": None}})
else:
time.sleep(10)
time.sleep(10)
Loading

0 comments on commit 99f85d0

Please sign in to comment.