diff --git a/conf/log/function_pipeline.conf b/conf/log/function_pipeline.conf new file mode 100644 index 000000000..c1df93613 --- /dev/null +++ b/conf/log/function_pipeline.conf @@ -0,0 +1,46 @@ +{ + "version": 1, + "disable_existing_loggers": false, + "formatters": { + "detailed": { + "class": "logging.Formatter", + "format": "%(asctime)s:%(levelname)s:%(thread)d:%(message)s" + } + }, + "handlers": { + "errors": { + "class": "logging.handlers.RotatingFileHandler", + "level": "ERROR", + "formatter": "detailed", + "filename": "/var/tmp/function_pipeline-errors.log", + "mode": "a", + "maxBytes": 1073741824, + "backupCount": 2, + "encoding": "UTF-8" + }, + "file": { + "class": "logging.handlers.RotatingFileHandler", + "level": "DEBUG", + "formatter": "detailed", + "filename": "/var/tmp/function_pipeline.log", + "mode": "a", + "maxBytes": 1073741824, + "backupCount": 8, + "encoding": "UTF-8" + }, + "console": { + "class": "logging.StreamHandler", + "level": "WARNING", + "formatter": "detailed", + "stream": "ext://sys.stdout" + } + }, + "root": { + "handlers": [ + "console", + "file", + "errors" + ], + "level": "DEBUG" + } +} diff --git a/emission/functions/time_functions.py b/emission/functions/time_functions.py new file mode 100644 index 000000000..09e0eb807 --- /dev/null +++ b/emission/functions/time_functions.py @@ -0,0 +1,82 @@ +import json +import logging +import logging.config +import os +import time +import numpy as np +import arrow +import pymongo + +import emission.core.get_database as edb +import emission.core.timer as ect + +import emission.storage.decorations.stats_queries as esds + +def run_function_pipeline(process_number, function_list, skip_if_no_new_data=False): + """ + Run the function pipeline with the specified process number and function list. + Note that the process_number is only really used to customize the log file name + We could avoid passing it in by using the process id - os.getpid() instead, but + then we won't get the nice RotatingLogHandler properties such as auto-deleting + files if there are too many. Maybe it will work properly with logrotate? Need to check + + :param process_number: id representing the process number. In range (0..n) + :param function_list: the list of functions that this process will handle + :param skip_if_no_new_data: flag to skip function execution based on custom logic + :return: None + """ + try: + with open("conf/log/function_pipeline.conf", "r") as cf: + pipeline_log_config = json.load(cf) + except FileNotFoundError: + with open("conf/log/function_pipeline.conf.sample", "r") as cf: + pipeline_log_config = json.load(cf) + + # Customize log filenames with process number + pipeline_log_config["handlers"]["file"]["filename"] = \ + pipeline_log_config["handlers"]["file"]["filename"].replace("function_pipeline", f"function_pipeline_{process_number}") + pipeline_log_config["handlers"]["errors"]["filename"] = \ + pipeline_log_config["handlers"]["errors"]["filename"].replace("function_pipeline", f"function_pipeline_{process_number}") + + logging.config.dictConfig(pipeline_log_config) + np.random.seed(61297777) + + logging.info(f"Processing function list: { [func.__name__ for func in function_list] }") + + for func in function_list: + func_name = func.__name__ + if func is None: + continue + + try: + run_function_pipeline_step(func, skip_if_no_new_data) + except Exception as e: + esds.store_function_error(func_name, "WHOLE_PIPELINE", time.time(), None) + logging.exception(f"Found error {e} while processing pipeline for function {func_name}, skipping") + +def run_function_pipeline_step(func, skip_if_no_new_data): + """ + Execute a single step in the function pipeline. + + :param func: The function to execute + :param skip_if_no_new_data: Flag to determine if the function should be skipped based on custom logic + :return: None + """ + func_name = func.__name__ + + with ect.Timer() as timer: + logging.info(f"********** Function {func_name}: Starting execution **********") + print(f"{arrow.now()} ********** Function {func_name}: Starting execution **********") + result = func() + + # Store the execution time + esds.store_function_time(func_name, "EXECUTION", + time.time(), timer.elapsed) + + if skip_if_no_new_data and not result: + print(f"No new data for {func_name}, and skip_if_no_new_data = {skip_if_no_new_data}, skipping the rest of the pipeline") + return + else: + print(f"Function {func_name} executed with result = {result} and skip_if_no_new_data = {skip_if_no_new_data}, continuing") + + logging.info(f"********** Function {func_name}: Completed execution **********") diff --git a/emission/storage/decorations/stats_queries.py b/emission/storage/decorations/stats_queries.py index eab8e77f1..762a05436 100644 --- a/emission/storage/decorations/stats_queries.py +++ b/emission/storage/decorations/stats_queries.py @@ -64,50 +64,6 @@ def store_function_time(user_id: str, stage_string: str, ts: float, reading: flo store_stats_entry(user_id, "stats/function_time", stage_string, ts, reading) -def time_and_store_function(user_id: str): - """ - Decorator to measure execution time of functions and store the stats under 'stats/function_time'. - - Parameters: - - user_id (str): The ID of the user associated with the stats. - - Usage: - @time_and_store_function(user_id="user123") - def my_function(...): - ... - """ - def decorator(func: Callable): - @wraps(func) - def wrapper(*args, **kwargs): - print(f"Decorator invoked for {func.__name__}") - stage_string = func.__name__ - ts = time.time() - logging.info(f"Starting '{stage_string}' execution.") - start_time = time.time() - try: - result = func(*args, **kwargs) - success = True - return result - except Exception as e: - success = False - logging.error(f"Error in '{stage_string}': {e}", exc_info=True) - raise - finally: - end_time = time.time() - duration_ms = (end_time - start_time) * 1000 # Convert to milliseconds - logging.info(f"Finished '{stage_string}' in {duration_ms:.2f} ms.") - # Store the timing stats - try: - store_function_time( - user_id=user_id, - stage_string=stage_string, - ts=ts, - reading=duration_ms - ) - except Exception as storage_error: - logging.error(f"Failed to store timing stats for '{stage_string}': {storage_error}", exc_info=True) - if not success: - logging.warning(f"'{stage_string}' encountered an error.") - return wrapper - return decorator +def store_function_error(user_id, stage_string, ts, reading): + store_stats_entry(user_id, "stats/function_time", stage_string, ts, reading) diff --git a/emission/tests/funcTests/TestFunctionTiming.py b/emission/tests/funcTests/TestFunctionTiming.py new file mode 100644 index 000000000..5e0cea46b --- /dev/null +++ b/emission/tests/funcTests/TestFunctionTiming.py @@ -0,0 +1,49 @@ +# emission/tests/funcTests/TestFunctionTiming.py + +import json +import logging +import logging.config +import os +import time +import numpy as np +import arrow +from contextlib import contextmanager + +# Import the run_function_pipeline function from time_functions.py +from emission.functions.time_functions import run_function_pipeline + +# Define test functions +def test_function_1(): + logging.info("Executing test_function_1") + time.sleep(1) # Simulate processing time + return True # Indicate successful execution + +def test_function_2(): + logging.info("Executing test_function_2") + time.sleep(1) + return True + +def test_function_faulty(): + logging.info("Executing test_function_faulty") + time.sleep(1) + raise ValueError("Simulated error in test_function_faulty") + +def test_function_3(): + logging.info("Executing test_function_3") + time.sleep(1) + return True + +if __name__ == "__main__": + # Ensure the logs directory exists + os.makedirs("logs", exist_ok=True) + + # Define the list of test functions, including the faulty one + function_list = [ + test_function_1, + test_function_2, + test_function_faulty, # This will raise an exception + test_function_3 # This should execute normally after the faulty function + ] + + # Run the pipeline with process number 1 and skip_if_no_new_data set to True + run_function_pipeline(process_number=1, function_list=function_list, skip_if_no_new_data=True)