Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stats Timing #986

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft

Conversation

TeachMeTW
Copy link

No description provided.

@TeachMeTW
Copy link
Author

TeachMeTW commented Oct 5, 2024

@shankari

Current implementation on commit d19392d builds upon existing code base of store_stats_entry where store_function_time calls store_stats_entry(user_id, "stats/function_time", stage_string, ts, reading)

It takes a similar approach to run_intake_pipeline from intake_stage.py but instead of user_id; it takes a function:

def run_function_pipeline_step(func, skip_if_no_new_data):
    """
    Execute a single step in the function pipeline.

    :param func: The function to execute
    :param skip_if_no_new_data: Flag to determine if the function should be skipped based on custom logic
    :return: None
    """
    func_name = func.__name__

    with ect.Timer() as timer:
        logging.info(f"********** Function {func_name}: Starting execution **********")
        print(f"{arrow.now()} ********** Function {func_name}: Starting execution **********")
        result = func()

    # Store the execution time
    esds.store_function_time(func_name, "EXECUTION",
                             time.time(), timer.elapsed)

    if skip_if_no_new_data and not result:
        print(f"No new data for {func_name}, and skip_if_no_new_data = {skip_if_no_new_data}, skipping the rest of the pipeline")
        return
    else:
        print(f"Function {func_name} executed with result = {result} and skip_if_no_new_data = {skip_if_no_new_data}, continuing")

    logging.info(f"********** Function {func_name}: Completed execution **********")

An example use case:

# Define test functions
def test_function_1():
    logging.info("Executing test_function_1")
    time.sleep(1)  # Simulate processing time
    return True  # Indicate successful execution

def test_function_2():
    logging.info("Executing test_function_2")
    time.sleep(1)
    return True

def test_function_faulty():
    logging.info("Executing test_function_faulty")
    time.sleep(1)
    raise ValueError("Simulated error in test_function_faulty")

def test_function_3():
    logging.info("Executing test_function_3")
    time.sleep(1)
    return True

if __name__ == "__main__":
    # Ensure the logs directory exists
    os.makedirs("logs", exist_ok=True)
    
    # Define the list of test functions, including the faulty one
    function_list = [
        test_function_1,
        test_function_2,
        test_function_faulty,  # This will raise an exception
        test_function_3  # This should execute normally after the faulty function
    ]
    
    # Run the pipeline with process number 1 and skip_if_no_new_data set to True
    run_function_pipeline(process_number=1, function_list=function_list, skip_if_no_new_data=True)

Concerns:

Now I am unsure how to change the query from user_id to function_id/make a new db collection for it within timeseries; the metadata is right but the schema is not. I cannot access it on the notebook due to a keyerror: KeyError: 'stats/function_time' but I can access it on the mongo via: db.Stage_timeseries.find({ "user_id": "test_function_1" }).pretty()

Output of the Find:

{
        "_id" : ObjectId("6700f706b37230e8be74d445"),
        "user_id" : "test_function_1",
        "metadata" : {
                "key" : "stats/function_time",
                "platform" : "server",
                "write_ts" : 1728116486.4357288,
                "time_zone" : "America/Los_Angeles",
                "write_local_dt" : {
                        "year" : 2024,
                        "month" : 10,
                        "day" : 5,
                        "hour" : 1,
                        "minute" : 21,
                        "second" : 26,
                        "weekday" : 5,
                        "timezone" : "America/Los_Angeles"
                },
                "write_fmt_time" : "2024-10-05T01:21:26.435729-07:00"
        },
        "data" : {
                "name" : "EXECUTION",
                "ts" : 1728116486.4328,
                "reading" : 1.001198583
        }
}

Questions:

How can I update the location/not use the same as pipeline time or is it ok? Do I need extra steps for aggregate? Is this what you had envisioned?

I believe the payload/schema should be different, but I'm not too sure in what form you'd suggest

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant