Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stats Timing #986

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions conf/log/function_pipeline.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"version": 1,
"disable_existing_loggers": false,
"formatters": {
"detailed": {
"class": "logging.Formatter",
"format": "%(asctime)s:%(levelname)s:%(thread)d:%(message)s"
}
},
"handlers": {
"errors": {
"class": "logging.handlers.RotatingFileHandler",
"level": "ERROR",
"formatter": "detailed",
"filename": "/var/tmp/function_pipeline-errors.log",
"mode": "a",
"maxBytes": 1073741824,
"backupCount": 2,
"encoding": "UTF-8"
},
"file": {
"class": "logging.handlers.RotatingFileHandler",
"level": "DEBUG",
"formatter": "detailed",
"filename": "/var/tmp/function_pipeline.log",
"mode": "a",
"maxBytes": 1073741824,
"backupCount": 8,
"encoding": "UTF-8"
},
"console": {
"class": "logging.StreamHandler",
"level": "WARNING",
"formatter": "detailed",
"stream": "ext://sys.stdout"
}
},
"root": {
"handlers": [
"console",
"file",
"errors"
],
"level": "DEBUG"
}
}
2 changes: 2 additions & 0 deletions emission/core/wrapper/entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ def _getData2Wrapper():
"stats/server_api_error": "statsevent",
# pipeline stage time, measured on the server
"stats/pipeline_time": "statsevent",
# function time, measured on the server
"stats/function_time": "statsevent",
# intended to log the occurrence of errors in the pipeline
"stats/pipeline_error": "statsevent",
# time for various client operations, measured on the client
Expand Down
82 changes: 82 additions & 0 deletions emission/functions/time_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import json
import logging
import logging.config
import os
import time
import numpy as np
import arrow
import pymongo

import emission.core.get_database as edb
import emission.core.timer as ect

import emission.storage.decorations.stats_queries as esds

def run_function_pipeline(process_number, function_list, skip_if_no_new_data=False):
"""
Run the function pipeline with the specified process number and function list.
Note that the process_number is only really used to customize the log file name
We could avoid passing it in by using the process id - os.getpid() instead, but
then we won't get the nice RotatingLogHandler properties such as auto-deleting
files if there are too many. Maybe it will work properly with logrotate? Need to check

:param process_number: id representing the process number. In range (0..n)
:param function_list: the list of functions that this process will handle
:param skip_if_no_new_data: flag to skip function execution based on custom logic
:return: None
"""
try:
with open("conf/log/function_pipeline.conf", "r") as cf:
pipeline_log_config = json.load(cf)
except FileNotFoundError:
with open("conf/log/function_pipeline.conf.sample", "r") as cf:
pipeline_log_config = json.load(cf)

# Customize log filenames with process number
pipeline_log_config["handlers"]["file"]["filename"] = \
pipeline_log_config["handlers"]["file"]["filename"].replace("function_pipeline", f"function_pipeline_{process_number}")
pipeline_log_config["handlers"]["errors"]["filename"] = \
pipeline_log_config["handlers"]["errors"]["filename"].replace("function_pipeline", f"function_pipeline_{process_number}")

logging.config.dictConfig(pipeline_log_config)
np.random.seed(61297777)

logging.info(f"Processing function list: { [func.__name__ for func in function_list] }")

for func in function_list:
func_name = func.__name__
if func is None:
continue

try:
run_function_pipeline_step(func, skip_if_no_new_data)
except Exception as e:
esds.store_function_error(func_name, "WHOLE_PIPELINE", time.time(), None)
logging.exception(f"Found error {e} while processing pipeline for function {func_name}, skipping")

def run_function_pipeline_step(func, skip_if_no_new_data):
"""
Execute a single step in the function pipeline.

:param func: The function to execute
:param skip_if_no_new_data: Flag to determine if the function should be skipped based on custom logic
:return: None
"""
func_name = func.__name__

with ect.Timer() as timer:
logging.info(f"********** Function {func_name}: Starting execution **********")
print(f"{arrow.now()} ********** Function {func_name}: Starting execution **********")
result = func()

# Store the execution time
esds.store_function_time(func_name, "EXECUTION",
time.time(), timer.elapsed)

if skip_if_no_new_data and not result:
print(f"No new data for {func_name}, and skip_if_no_new_data = {skip_if_no_new_data}, skipping the rest of the pipeline")
return
else:
print(f"Function {func_name} executed with result = {result} and skip_if_no_new_data = {skip_if_no_new_data}, continuing")

logging.info(f"********** Function {func_name}: Completed execution **********")
2 changes: 1 addition & 1 deletion emission/pipeline/intake_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data):
eaum.match_incoming_user_inputs(uuid)

esds.store_pipeline_time(uuid, ecwp.PipelineStages.USER_INPUT_MATCH_INCOMING.name,
time.time(), uct.elapsed)
time.time(), uit.elapsed)

# Hack until we delete these spurious entries
# https://github.com/e-mission/e-mission-server/issues/407#issuecomment-2484868
Expand Down
21 changes: 21 additions & 0 deletions emission/storage/decorations/stats_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from builtins import *
import logging
import time
from functools import wraps
from typing import Callable, Any

# Our imports
import emission.storage.timeseries.abstract_timeseries as esta
Expand Down Expand Up @@ -46,3 +48,22 @@ def store_stats_entry(user_id, metadata_key, name, ts, reading):
new_entry = ecwe.Entry.create_entry(user_id, metadata_key, data)
return esta.TimeSeries.get_time_series(user_id).insert(new_entry)

def store_function_time(user_id: str, stage_string: str, ts: float, reading: float):
"""
Stores the execution time of a function.

Parameters:
- user_id (str): The ID of the user.
- stage_string (str): The name of the function being timed.
- ts (float): The timestamp when the function execution started.
- reading (float): The duration of the function execution in milliseconds.

Returns:
- InsertResult: The result of the insert operation.
"""
store_stats_entry(user_id, "stats/function_time", stage_string, ts, reading)


def store_function_error(user_id, stage_string, ts, reading):
store_stats_entry(user_id, "stats/function_time", stage_string, ts, reading)

1 change: 1 addition & 0 deletions emission/storage/timeseries/builtin_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def __init__(self, user_id):
"stats/server_api_time": self.timeseries_db,
"stats/server_api_error": self.timeseries_db,
"stats/pipeline_time": self.timeseries_db,
"stats/function_time": self.timeseries_db,
"stats/pipeline_error": self.timeseries_db,
"stats/client_time": self.timeseries_db,
"stats/client_nav_event": self.timeseries_db,
Expand Down
49 changes: 49 additions & 0 deletions emission/tests/funcTests/TestFunctionTiming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# emission/tests/funcTests/TestFunctionTiming.py

import json
import logging
import logging.config
import os
import time
import numpy as np
import arrow
from contextlib import contextmanager

# Import the run_function_pipeline function from time_functions.py
from emission.functions.time_functions import run_function_pipeline

# Define test functions
def test_function_1():
logging.info("Executing test_function_1")
time.sleep(1) # Simulate processing time
return True # Indicate successful execution

def test_function_2():
logging.info("Executing test_function_2")
time.sleep(1)
return True

def test_function_faulty():
logging.info("Executing test_function_faulty")
time.sleep(1)
raise ValueError("Simulated error in test_function_faulty")

def test_function_3():
logging.info("Executing test_function_3")
time.sleep(1)
return True

if __name__ == "__main__":
# Ensure the logs directory exists
os.makedirs("logs", exist_ok=True)

# Define the list of test functions, including the faulty one
function_list = [
test_function_1,
test_function_2,
test_function_faulty, # This will raise an exception
test_function_3 # This should execute normally after the faulty function
]

# Run the pipeline with process number 1 and skip_if_no_new_data set to True
run_function_pipeline(process_number=1, function_list=function_list, skip_if_no_new_data=True)
Loading