Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: change workflow after clicking 'Apply changes' button #82

Merged
merged 1 commit into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Changed
- add error handling for apply changes action
- after clicking 'Apply changes' workflow is initially attempting to create new job immediately, if it is impossible, schedule it for the future

## [1.0.2]

Expand Down
6 changes: 4 additions & 2 deletions backend/SC4SNMP_UI_backend/apply_changes/apply_changes.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ def __init__(self) -> None:
mongo_config_collection.update_one(
{
"previous_job_start_time": {"$exists": True},
"currently_scheduled": {"$exists": True}}
"currently_scheduled": {"$exists": True},
"task_id": {"$exists": True}}
,{
"$set":{
"previous_job_start_time": None,
"currently_scheduled": False
"currently_scheduled": False,
"task_id": None
}
},
upsert=True
Expand Down
69 changes: 48 additions & 21 deletions backend/SC4SNMP_UI_backend/apply_changes/handling_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import ruamel.yaml
from flask import current_app
from SC4SNMP_UI_backend import mongo_client
from SC4SNMP_UI_backend.apply_changes.tasks import run_job
from SC4SNMP_UI_backend.apply_changes.tasks import run_job, get_job_config
from SC4SNMP_UI_backend.apply_changes.kubernetes_job import create_job
from kubernetes.client import ApiException
import datetime
import os

Expand All @@ -13,6 +15,7 @@
VALUES_DIRECTORY = os.getenv("VALUES_DIRECTORY", "")
VALUES_FILE = os.getenv("VALUES_FILE", "")
KEEP_TEMP_FILES = os.getenv("KEEP_TEMP_FILES", "false")
JOB_NAMESPACE = os.getenv("JOB_NAMESPACE", "sc4snmp")
mongo_config_collection = mongo_client.sc4snmp.config_collection
mongo_groups = mongo_client.sc4snmp.groups_ui
mongo_inventory = mongo_client.sc4snmp.inventory_ui
Expand Down Expand Up @@ -138,31 +141,55 @@ def handle(self, request: dict = None):
:return: pass dictionary with job_delay in seconds to the next handler
"""
record = list(mongo_config_collection.find())[0]
last_update = record["previous_job_start_time"]
if last_update is None:
# If it's the first time that the job is run (record in mongo_config_collection has been created
# in ApplyChanges class and last_update attribute is None) then job delay should be equal to
# CHANGES_INTERVAL_SECONDS. Update the mongo record with job state accordingly.
job_delay = CHANGES_INTERVAL_SECONDS
schedule_new_job = True
# get_job_config return job configuration in "job" variable and BatchV1Api from kubernetes client
job, batch_v1 = get_job_config()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is batch_v1?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explained in the new comment in code

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment is ok, but maybe we can move it instead of a comment to documentation of method under the get_job_config()?

if job is None or batch_v1 is None:
raise ValueError("CheckJobHandler: Job configuration is empty")
try:
# Try creating a new kubernetes job immediately. If the previous job is still present in the namespace,
# ApiException will be thrown.
create_job(batch_v1, job, JOB_NAMESPACE)
task_id = record["task_id"]
if task_id is not None:
# revoke existing Celery task with the previously scheduled job
current_app.extensions["celery"].control.revoke(task_id,
terminate=True, signal='SIGKILL')
mongo_config_collection.update_one({"_id": record["_id"]},
{"$set": {"previous_job_start_time": datetime.datetime.utcnow()}})
# time from the last update
{"$set": {"previous_job_start_time": datetime.datetime.utcnow(),
"currently_scheduled": False,
"task_id": None}})
job_delay = 1
time_difference = 0
else:
schedule_new_job = False
except ApiException:
# Check how many seconds have elapsed since the last time that the job was run. If the time difference
# is greater than CHANGES_INTERVAL_SECONDS then job can be run immediately. Otherwise, calculate how
# is greater than CHANGES_INTERVAL_SECONDS then job can be scheduled within 1 second. Otherwise, calculate how
# many seconds are left until minimum time difference between updates (CHANGES_INTERVAL_SECONDS).
current_time = datetime.datetime.utcnow()
delta = current_time - last_update
time_difference = delta.total_seconds()
if time_difference > CHANGES_INTERVAL_SECONDS:
job_delay = 1
last_update = record["previous_job_start_time"]
if last_update is None:
# If it's the first time that the job is run (record in mongo_config_collection has been created
# in ApplyChanges class and last_update attribute is None) but the previous job is still in the namespace
# then job delay should be equal to CHANGES_INTERVAL_SECONDS.
# Update the mongo record with job state accordingly.
job_delay = CHANGES_INTERVAL_SECONDS
mongo_config_collection.update_one({"_id": record["_id"]},
{"$set": {"previous_job_start_time": datetime.datetime.utcnow()}})
# time from the last update
time_difference = 0
else:
job_delay = int(CHANGES_INTERVAL_SECONDS - time_difference)
current_time = datetime.datetime.utcnow()
delta = current_time - last_update
time_difference = delta.total_seconds()
if time_difference > CHANGES_INTERVAL_SECONDS:
job_delay = 1
else:
job_delay = int(CHANGES_INTERVAL_SECONDS - time_difference)

result = {
"job_delay": job_delay,
"time_from_last_update": time_difference
"time_from_last_update": time_difference,
"schedule_new_job": schedule_new_job
}

current_app.logger.info(f"CheckJobHandler: {result}")
Expand All @@ -175,11 +202,11 @@ def handle(self, request: dict):
ScheduleHandler schedules the kubernetes job with updated sc4snmp configuration
"""
record = list(mongo_config_collection.find())[0]
if not record["currently_scheduled"]:
if not record["currently_scheduled"] and request["schedule_new_job"]:
# If the task isn't currently scheduled, schedule it and update its state in mongo.
async_result = run_job.apply_async(countdown=request["job_delay"], queue='apply_changes')
mongo_config_collection.update_one({"_id": record["_id"]},
{"$set": {"currently_scheduled": True}})
run_job.apply_async(countdown=request["job_delay"], queue='apply_changes')
{"$set": {"currently_scheduled": True, "task_id": async_result.id}})
current_app.logger.info(
f"ScheduleHandler: scheduling new task with the delay of {request['job_delay']} seconds.")
else:
Expand Down
23 changes: 17 additions & 6 deletions backend/SC4SNMP_UI_backend/apply_changes/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
JOB_CONFIG_PATH = os.getenv("JOB_CONFIG_PATH", "/config/job_config.yaml")
celery_logger = get_task_logger(__name__)

@shared_task()
def run_job():
def get_job_config():
"""
:return: job - configuration of the job
batch_v1 - BatchV1Api object from kubernetes client
"""
job = None
batch_v1 = None
with open(JOB_CONFIG_PATH, encoding="utf-8") as file:
Expand All @@ -26,6 +29,13 @@ def run_job():
config.load_incluster_config()
batch_v1 = client.BatchV1Api()
job = create_job_object(config_file)
return job, batch_v1

@shared_task()
def run_job():
job, batch_v1 = get_job_config()
if job is None or batch_v1 is None:
raise ValueError("Scheduled kubernetes job: Job configuration is empty")

with MongoClient(MONGO_URI) as connection:
try_creating = True
Expand All @@ -39,8 +49,9 @@ def run_job():
try:
record = list(connection.sc4snmp.config_collection.find())[0]
connection.sc4snmp.config_collection.update_one({"_id": record["_id"]},
{"$set": {"previous_job_start_time": datetime.datetime.utcnow(),
"currently_scheduled": False}})
{"$set": {"previous_job_start_time": datetime.datetime.utcnow(),
"currently_scheduled": False,
"task_id": None}})
except Exception as e:
celery_logger.info(f"Error occurred while updating job state after job creation: {str(e)}")
except ApiException:
Expand All @@ -50,6 +61,6 @@ def run_job():
celery_logger.info(f"Kubernetes job was not created. Max retries ({JOB_CREATION_RETRIES}) exceeded.")
record = list(connection.sc4snmp.config_collection.find())[0]
connection.sc4snmp.config_collection.update_one({"_id": record["_id"]},
{"$set": {"currently_scheduled": False}})
{"$set": {"currently_scheduled": False, "task_id": None}})
else:
time.sleep(10)
time.sleep(10)
Loading
Loading