Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance(apps/analytics): add scripts for the computation of aggregated analytics #4385

Merged
merged 2 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/analytics/src/modules/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .participant_analytics import compute_correctness, get_participant_responses
from .aggregated_analytics import compute_aggregated_analytics
4 changes: 4 additions & 0 deletions apps/analytics/src/modules/aggregated_analytics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .compute_aggregated_analytics import compute_aggregated_analytics
from .load_participant_analytics import load_participant_analytics
from .aggregate_participant_analytics import aggregate_participant_analytics
from .save_aggregated_analytics import save_aggregated_analytics
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
def aggregate_participant_analytics(df_participant_analytics, verbose=False):
# if the dataframe is empty, return None
if df_participant_analytics.empty:
if verbose:
print("No participant analytics to aggregate")

return None

# aggreagte all participant analytics for the specified time range and separate courses
df_aggregated_analytics = (
df_participant_analytics.groupby("courseId")
.agg(
{
"id": "count",
"responseCount": "sum",
"totalScore": "sum",
"totalPoints": "sum",
"totalXp": "sum",
}
)
.reset_index()
.rename(
columns={
"id": "participantCount",
}
)
)

return df_aggregated_analytics
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from .load_participant_analytics import load_participant_analytics
from .aggregate_participant_analytics import aggregate_participant_analytics
from .save_aggregated_analytics import save_aggregated_analytics


def compute_aggregated_analytics(
db, start_date, end_date, timestamp, analytics_type="DAILY", verbose=False
):
# load all participant analytics for the given timestamp and analytics time range
df_participant_analytics = load_participant_analytics(
db, timestamp, analytics_type, verbose
)

# aggregate all participant analytics values by course
df_aggregated_analytics = aggregate_participant_analytics(
df_participant_analytics, verbose
)

if df_aggregated_analytics is not None and verbose:
print("Aggregated analytics for time range:" + start_date + " to " + end_date)
print(df_aggregated_analytics.head())
elif df_aggregated_analytics is None:
print(
"No aggregated analytics to compute for time range:"
+ start_date
+ " to "
+ end_date
)

# store the computed aggregated analytics in the database
if df_aggregated_analytics is not None:
save_aggregated_analytics(
db, df_aggregated_analytics, timestamp, analytics_type
)
sjschlapbach marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import pandas as pd


def convert_to_df(analytics):
# convert the database query result into a pandas dataframe
rows = []
for item in analytics:
rows.append(dict(item))

return pd.DataFrame(rows)


def load_participant_analytics(db, timestamp, analytics_type, verbose=False):
participant_analytics = db.participantanalytics.find_many(
where={"timestamp": timestamp, "type": analytics_type},
)

if verbose:
# Print the first participant analytics
print(
"Found {} analytics for the timespan from {} to {}".format(
len(participant_analytics), start_date, end_date
sjschlapbach marked this conversation as resolved.
Show resolved Hide resolved
)
)
print(participant_analytics[0])

# convert the analytics to a dataframe
df_loaded_analytics = convert_to_df(participant_analytics)

return df_loaded_analytics
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
from datetime import datetime


def save_aggregated_analytics(db, df_analytics, timestamp, analytics_type="DAILY"):
computedAt = datetime.now().strftime("%Y-%m-%d") + "T00:00:00.000Z"
sjschlapbach marked this conversation as resolved.
Show resolved Hide resolved

# create daily / weekly / monthly analytics entries for all participants
if analytics_type in ["DAILY", "WEEKLY", "MONTHLY"]:
for _, row in df_analytics.iterrows():
db.aggregatedanalytics.upsert(
where={
"type_courseId_timestamp": {
"type": analytics_type,
"courseId": row["courseId"],
"timestamp": timestamp,
}
},
data={
"create": {
"type": analytics_type,
"timestamp": timestamp,
"computedAt": computedAt,
"participantCount": row["participantCount"],
"responseCount": row["responseCount"],
"totalScore": row["totalScore"],
"totalPoints": row["totalPoints"],
"totalXp": row["totalXp"],
# TODO: set this value correctly for rolling updates in production code
# (cannot be computed for past learning analytics -> therefore set to invalid value)
"totalElementsAvailable": -1,
sjschlapbach marked this conversation as resolved.
Show resolved Hide resolved
"course": {"connect": {"id": row["courseId"]}},
},
"update": {},
sjschlapbach marked this conversation as resolved.
Show resolved Hide resolved
},
)

# create or update course-wide analytics entries (should be unique for participant / course combination)
elif analytics_type == "COURSE":
for _, row in df_analytics.iterrows():
course = db.course.find_unique_or_raise(
where={"id": row["courseId"]},
include={
"practiceQuizzes": {
"include": {
"stacks": {
"include": {"elements": True},
}
}
},
"microLearnings": {
"include": {
"stacks": {
"include": {"elements": True},
}
}
},
},
)
course = dict(course)

# add all the number of elements in all practice quizzes and microlearnings together
totalElementsAvailable = 0
for practice_quiz in course["practiceQuizzes"]:
pq_dict = dict(practice_quiz)
for stack in pq_dict["stacks"]:
stack_dict = dict(stack)
totalElementsAvailable += len(stack_dict["elements"])
for microlearning in course["microLearnings"]:
ml_dict = dict(microlearning)
for stack in ml_dict["stacks"]:
stack_dict = dict(stack)
totalElementsAvailable += len(stack_dict["elements"])

db.aggregatedanalytics.upsert(
where={
"type_courseId_timestamp": {
"type": analytics_type,
"courseId": row["courseId"],
"timestamp": timestamp,
}
},
data={
"create": {
"type": analytics_type,
"timestamp": timestamp,
"computedAt": computedAt,
"participantCount": row["participantCount"],
"responseCount": row["responseCount"],
"totalScore": row["totalScore"],
"totalPoints": row["totalPoints"],
"totalXp": row["totalXp"],
"totalElementsAvailable": totalElementsAvailable,
"course": {"connect": {"id": row["courseId"]}},
},
"update": {
"computedAt": computedAt,
"participantCount": row["participantCount"],
"responseCount": row["responseCount"],
"totalScore": row["totalScore"],
"totalPoints": row["totalPoints"],
"totalXp": row["totalXp"],
"totalElementsAvailable": totalElementsAvailable,
},
},
)

else:
raise ValueError("Unknown analytics type: {}".format(analytics_type))
161 changes: 161 additions & 0 deletions apps/analytics/src/notebooks/aggregated_analytics.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Preparation\n",
"\n",
"This script computes analytics that are aggregated over all participants in a course over a specified time span. The corresponding results are stored in the `AggregatedAnalytics` database table. While the daily, weekly and monthly scripts are designed to run at the end of the corresponding period (only creation, no updates), the course analytics are once more meant to be updated on a regular basis (daily?). Minor changes to the calling logic of these functions will be required for continuous updates."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import json\n",
"from datetime import datetime\n",
"from prisma import Prisma\n",
"import pandas as pd\n",
"\n",
"# set the python path correctly for module imports to work\n",
"import sys\n",
"sys.path.append('../../')\n",
"\n",
"from src.modules.aggregated_analytics.compute_aggregated_analytics import compute_aggregated_analytics"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"db = Prisma()\n",
"\n",
"# set the environment variable DATABASE_URL to the connection string of your database\n",
"os.environ['DATABASE_URL'] = 'postgresql://klicker:klicker@localhost:5432/klicker-prod'\n",
Dismissed Show dismissed Hide dismissed
sjschlapbach marked this conversation as resolved.
Show resolved Hide resolved
"\n",
"db.connect()\n",
"\n",
"# Script settings\n",
"verbose = False\n",
"\n",
"# Settings which analytics to compute\n",
"compute_daily = True\n",
"compute_weekly = True\n",
"compute_monthly = True\n",
"compute_course = True"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Compute Aggregated Analytics on Course Level\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"start_date = \"2021-01-01\"\n",
"end_date = datetime.now().strftime(\"%Y-%m-%d\")\n",
"date_range_daily = pd.date_range(start=start_date, end=end_date, freq=\"D\")\n",
"date_range_weekly = pd.date_range(start=start_date, end=end_date, freq=\"W\")\n",
"date_range_monthly = pd.date_range(start=start_date, end=end_date, freq=\"ME\")\n",
"\n",
"if compute_daily:\n",
" # Iterate over the date range and compute the participant analytics for each day\n",
" for curr_date in date_range_daily:\n",
" # determine day start and end dates required for aggregation\n",
" specific_date = curr_date.strftime(\"%Y-%m-%d\")\n",
" day_start = specific_date + \"T00:00:00.000Z\"\n",
" day_end = specific_date + \"T23:59:59.999Z\"\n",
" print(f\"Computing daily aggregated analytics (course) for {specific_date}\")\n",
"\n",
" # compute aggregated analytics for a specific day\n",
" timestamp = day_start\n",
" compute_aggregated_analytics(\n",
" db, day_start, day_end, timestamp, \"DAILY\", verbose\n",
" )\n",
sjschlapbach marked this conversation as resolved.
Show resolved Hide resolved
"\n",
"\n",
"if compute_weekly:\n",
" # Iterate over the date range and compute the participant analytics for each week\n",
" for curr_date in date_range_weekly:\n",
" # determine week start and end dates required for aggregation\n",
" week_end = curr_date.strftime(\"%Y-%m-%d\") + \"T23:59:59.999Z\"\n",
" week_start = (curr_date - pd.DateOffset(days=6)).strftime(\n",
" \"%Y-%m-%d\"\n",
" ) + \"T00:00:00.000Z\"\n",
" print(\n",
" f\"Computing weekly aggregated analytics (course) for {week_start } to {week_end }\"\n",
" )\n",
"\n",
" # compute aggregated analytics for a specific week\n",
" timestamp = week_end\n",
" compute_aggregated_analytics(\n",
" db, week_start, week_end, timestamp, \"WEEKLY\", verbose\n",
" )\n",
"\n",
"\n",
"if compute_monthly:\n",
" # Iterate over the date range and compute the participant analytics for each month\n",
" for curr_date in date_range_monthly:\n",
" # determine month start and end dates required for aggregation\n",
" month_end = curr_date.strftime('%Y-%m-%d') + 'T23:59:59.999Z'\n",
" month_start = (curr_date - pd.offsets.MonthBegin(1)).strftime('%Y-%m-%d') + 'T00:00:00.000Z'\n",
" print(\n",
" f\"Computing monthly aggregated analytics (course) for {month_start } to {month_end }\"\n",
" )\n",
"\n",
" # compute aggregated analytics for a specific month\n",
" timestamp = month_end\n",
" compute_aggregated_analytics(\n",
" db, month_start, month_end, timestamp, \"MONTHLY\", verbose\n",
" )\n",
"\n",
"\n",
"if compute_course:\n",
" print(\n",
" f\"Computing course-wide aggregated analytics\"\n",
" )\n",
"\n",
" # compute aggregated analytics over entire course based on corresponding participant analytics\n",
" # (a constant timestamp is used here, since the data combination has to be unique \n",
" # during querying, but only one entry per course is available by definition)\n",
" timestamp = \"1970-01-01T00:00:00.000Z\"\n",
" compute_aggregated_analytics(\n",
" db, timestamp, timestamp, timestamp, \"COURSE\", verbose\n",
" )"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "analytics-fkWWeYLw-py3.12",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Loading